Go to the documentation of this file.
40 # define MPI_SCALAR MPI_FLOAT
42 # define MPI_SCALAR MPI_DOUBLE
53 validParOptions.insert(
"np",
"");
54 validParOptions.insert(
"p4pg",
"PI file");
55 validParOptions.insert(
"p4wd",
"directory");
56 validParOptions.insert(
"p4amslave",
"");
57 validParOptions.insert(
"p4yourname",
"hostname");
58 validParOptions.insert(
"machinefile",
"machine file");
64 MPI_Init(&argc, &argv);
67 MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
69 MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
73 Pout<<
"UPstream::init : initialised with numProcs:" << numprocs
74 <<
" myRank:" << myRank <<
endl;
80 <<
"bool IPstream::init(int& argc, char**& argv) : "
81 "attempt to run parallel on 1 processor"
90 string bufferSizeName =
getEnv(
"MPI_BUFFER_SIZE");
92 if (bufferSizeName.size())
94 int bufferSize = atoi(bufferSizeName.c_str());
98 MPI_Buffer_attach(
new char[bufferSize], bufferSize);
104 <<
"UPstream::init(int& argc, char**& argv) : "
105 <<
"environment variable MPI_BUFFER_SIZE not defined"
131 MPI_Buffer_detach(&buff, &size);
141 <<
"There are still " <<
n <<
" outstanding MPI_Requests." <<
endl
142 <<
"This means that your code exited before doing a"
143 <<
" UPstream::waitRequests()." <<
endl
144 <<
"This should not happen for a normal code exit."
149 forAll(myProcNo_, communicator)
151 if (myProcNo_[communicator] != -1)
153 freePstreamCommunicator(communicator);
164 MPI_Abort(MPI_COMM_WORLD, errnum);
171 MPI_Abort(MPI_COMM_WORLD, 1);
178 const sumOp<scalar>& bop,
180 const label communicator
185 Pout<<
"** reducing:" << Value <<
" with comm:" << communicator
190 allReduce(Value, 1, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
197 const minOp<scalar>& bop,
199 const label communicator
204 Pout<<
"** reducing:" << Value <<
" with comm:" << communicator
209 allReduce(Value, 1, MPI_SCALAR, MPI_MIN, bop, tag, communicator);
216 const sumOp<vector2D>& bop,
218 const label communicator
223 Pout<<
"** reducing:" << Value <<
" with comm:" << communicator
228 allReduce(Value, 2, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
237 const label communicator
242 Pout<<
"** reducing:" << Value <<
" with comm:" << communicator
247 vector2D twoScalars(Value, scalar(Count));
248 reduce(twoScalars, sumOp<vector2D>(), tag, communicator);
250 Value = twoScalars.x();
251 Count = twoScalars.y();
258 const sumOp<scalar>& bop,
260 const label communicator,
264 #ifdef MPIX_COMM_TYPE_SHARED
286 Pout<<
"UPstream::allocateRequest for non-blocking reduce"
287 <<
" : request:" << requestID
292 reduce(Value, bop, tag, communicator);
300 const label parentIndex,
307 MPI_Group newGroup = MPI_GROUP_NULL;
309 MPI_Comm newComm = MPI_COMM_NULL;
315 <<
"PstreamGlobals out of sync with UPstream data. Problem."
320 if (parentIndex == -1)
327 <<
"world communicator should always be index "
344 procIDs_[index].setSize(numProcs);
345 forAll(procIDs_[index], i)
347 procIDs_[index][i] = i;
356 procIDs_[index].size(),
357 procIDs_[index].begin(),
371 myProcNo_[index] = -1;
386 <<
" when allocating communicator at " << index
387 <<
" from ranks " << procIDs_[index]
388 <<
" of parent " << parentIndex
389 <<
" cannot find my own rank"
434 Pout<<
"UPstream::waitRequests : starting wait for "
436 <<
" outstanding requests starting at " << start <<
endl;
441 SubList<MPI_Request> waitRequests
453 waitRequests.begin(),
459 <<
"MPI_Waitall returned with error" <<
Foam::endl;
462 resetRequests(start);
467 Pout<<
"UPstream::waitRequests : finished wait." <<
endl;
476 Pout<<
"UPstream::waitRequest : starting wait for request:" << i
484 <<
" outstanding send requests and you are asking for i=" << i
486 <<
"Maybe you are mixing blocking/non-blocking comms?"
500 <<
"MPI_Wait returned with error" <<
Foam::endl;
505 Pout<<
"UPstream::waitRequest : finished wait for request:" << i
515 Pout<<
"UPstream::finishedRequest : checking request:" << i
523 <<
" outstanding send requests and you are asking for i=" << i
525 <<
"Maybe you are mixing blocking/non-blocking comms?"
539 Pout<<
"UPstream::finishedRequest : finished request:" << i
567 Pout<<
"UPstream::allocateTag " <<
s
596 Pout<<
"UPstream::allocateTag " <<
s
615 Pout<<
"UPstream::freeTag " <<
s <<
" tag:" << tag <<
endl;
631 Pout<<
"UPstream::freeTag " <<
s <<
" tag:" << tag <<
endl;
static int allocateTag(const char *)
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Functions used by OpenFOAM that are specific to POSIX compliant operating systems and need to be repl...
static void resetRequests(const label sz)
Truncate number of outstanding requests.
A class for handling words, derived from string.
#define forAll(list, i)
Loop across all elements in list.
static void exit(int errnum=1)
Exit program.
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
static void abort()
Abort program.
Ostream & endl(Ostream &os)
Add newline and flush stream.
static void freePstreamCommunicator(const label index)
Free a communicator.
DynamicList< MPI_Comm > MPICommunicators_
static void allocatePstreamCommunicator(const label parentIndex, const label index)
Allocate a communicator with index.
void allReduce(Type &Value, int count, MPI_Datatype MPIType, MPI_Op op, const BinaryOp &bop, const int tag, const label communicator)
void reduce(const List< UPstream::commsStruct > &comms, T &Value, const BinaryOp &bop, const int tag, const label comm)
static void printStack(Ostream &)
Helper function to print a stack.
string getEnv(const word &)
Return environment variable of given name.
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
DynamicList< int > freedTags_
static void waitRequest(const label i)
Wait until request i has finished.
static void addValidParOptions(HashTable< string > &validParOptions)
Add the valid option this type of communications library.
DynamicList< MPI_Group > MPIGroups_
errorManip< error > abort(error &err)
gmvFile<< "tracers "<< particles.size()<< nl;forAllConstIter(Cloud< passiveParticle >, particles, iter){ gmvFile<< iter().position().x()<< " ";}gmvFile<< nl;forAllConstIter(Cloud< passiveParticle >, particles, iter){ gmvFile<< iter().position().y()<< " ";}gmvFile<< nl;forAllConstIter(Cloud< passiveParticle >, particles, iter){ gmvFile<< iter().position().z()<< " ";}gmvFile<< nl;forAll(lagrangianScalarNames, i){ word name=lagrangianScalarNames[i];IOField< scalar > s(IOobject(name, runTime.timeName(), cloud::prefix, mesh, IOobject::MUST_READ, IOobject::NO_WRITE))
Various functions to wrap MPI_Allreduce.
DynamicList< T, SizeInc, SizeMult, SizeDiv > & append(const T &)
Append an element at the end of the list.
errorManipArg< error, int > exit(error &err, const int errNo=1)
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
static label nRequests()
Get number of outstanding requests.
prefixOSstream Pout(cout, "Pout")
Vector2D< scalar > vector2D
vector2D obtained from generic Vector2D
static label worldComm
Default communicator (all processors)
DynamicList< MPI_Request > outstandingRequests_
static bool finishedRequest(const label i)
Non-blocking comms: has request i finished?
T remove()
Remove and return the top element.
void sumReduce(T &Value, label &Count, const int tag=Pstream::msgType(), const label comm=UPstream::worldComm)
#define WarningInFunction
Report a warning using Foam::Warning.
static bool init(int &argc, char **&argv)
Initialisation function called from main.
static void freeTag(const char *, const int tag)