UPstream.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | Copyright (C) 2011-2015 OpenFOAM Foundation
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 \*---------------------------------------------------------------------------*/
25 
26 #include "UPstream.H"
27 #include "PstreamReduceOps.H"
28 #include "OSspecific.H"
29 #include "PstreamGlobals.H"
30 #include "SubList.H"
31 #include "allReduce.H"
32 
33 #include <mpi.h>
34 
35 #include <cstring>
36 #include <cstdlib>
37 #include <csignal>
38 
39 #if defined(WM_SP)
40 # define MPI_SCALAR MPI_FLOAT
41 #elif defined(WM_DP)
42 # define MPI_SCALAR MPI_DOUBLE
43 #endif
44 
45 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
46 
47 // NOTE:
48 // valid parallel options vary between implementations, but flag common ones.
49 // if they are not removed by MPI_Init(), the subsequent argument processing
50 // will notice that they are wrong
51 void Foam::UPstream::addValidParOptions(HashTable<string>& validParOptions)
52 {
53  validParOptions.insert("np", "");
54  validParOptions.insert("p4pg", "PI file");
55  validParOptions.insert("p4wd", "directory");
56  validParOptions.insert("p4amslave", "");
57  validParOptions.insert("p4yourname", "hostname");
58  validParOptions.insert("machinefile", "machine file");
59 }
60 
61 
62 bool Foam::UPstream::init(int& argc, char**& argv)
63 {
64  MPI_Init(&argc, &argv);
65 
66  int numprocs;
67  MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
68  int myRank;
69  MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
70 
71  if (debug)
72  {
73  Pout<< "UPstream::init : initialised with numProcs:" << numprocs
74  << " myRank:" << myRank << endl;
75  }
76 
77  if (numprocs <= 1)
78  {
80  << "bool IPstream::init(int& argc, char**& argv) : "
81  "attempt to run parallel on 1 processor"
83  }
84 
85 
86  // Initialise parallel structure
87  setParRun(numprocs);
88 
89 # ifndef SGIMPI
90  string bufferSizeName = getEnv("MPI_BUFFER_SIZE");
91 
92  if (bufferSizeName.size())
93  {
94  int bufferSize = atoi(bufferSizeName.c_str());
95 
96  if (bufferSize)
97  {
98  MPI_Buffer_attach(new char[bufferSize], bufferSize);
99  }
100  }
101  else
102  {
104  << "UPstream::init(int& argc, char**& argv) : "
105  << "environment variable MPI_BUFFER_SIZE not defined"
107  }
108 # endif
109 
110  //int processorNameLen;
111  //char processorName[MPI_MAX_PROCESSOR_NAME];
112  //
113  //MPI_Get_processor_name(processorName, &processorNameLen);
114  //processorName[processorNameLen] = '\0';
115  //Pout<< "Processor name:" << processorName << endl;
116 
117  return true;
118 }
119 
120 
121 void Foam::UPstream::exit(int errnum)
122 {
123  if (debug)
124  {
125  Pout<< "UPstream::exit." << endl;
126  }
127 
128 # ifndef SGIMPI
129  int size;
130  char* buff;
131  MPI_Buffer_detach(&buff, &size);
132  delete[] buff;
133 # endif
134 
136  {
139 
141  << "There are still " << n << " outstanding MPI_Requests." << endl
142  << "This means that your code exited before doing a"
143  << " UPstream::waitRequests()." << endl
144  << "This should not happen for a normal code exit."
145  << endl;
146  }
147 
148  // Clean mpi communicators
149  forAll(myProcNo_, communicator)
150  {
151  if (myProcNo_[communicator] != -1)
152  {
153  freePstreamCommunicator(communicator);
154  }
155  }
156 
157  if (errnum == 0)
158  {
159  MPI_Finalize();
160  ::exit(errnum);
161  }
162  else
163  {
164  MPI_Abort(MPI_COMM_WORLD, errnum);
165  }
166 }
167 
168 
170 {
171  MPI_Abort(MPI_COMM_WORLD, 1);
172 }
173 
174 
175 void Foam::reduce
176 (
177  scalar& Value,
178  const sumOp<scalar>& bop,
179  const int tag,
180  const label communicator
181 )
182 {
183  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
184  {
185  Pout<< "** reducing:" << Value << " with comm:" << communicator
186  << " warnComm:" << UPstream::warnComm
187  << endl;
189  }
190  allReduce(Value, 1, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
191 }
192 
193 
194 void Foam::reduce
195 (
196  scalar& Value,
197  const minOp<scalar>& bop,
198  const int tag,
199  const label communicator
200 )
201 {
202  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
203  {
204  Pout<< "** reducing:" << Value << " with comm:" << communicator
205  << " warnComm:" << UPstream::warnComm
206  << endl;
208  }
209  allReduce(Value, 1, MPI_SCALAR, MPI_MIN, bop, tag, communicator);
210 }
211 
212 
213 void Foam::reduce
214 (
215  vector2D& Value,
216  const sumOp<vector2D>& bop,
217  const int tag,
218  const label communicator
219 )
220 {
221  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
222  {
223  Pout<< "** reducing:" << Value << " with comm:" << communicator
224  << " warnComm:" << UPstream::warnComm
225  << endl;
227  }
228  allReduce(Value, 2, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
229 }
230 
231 
232 void Foam::sumReduce
233 (
234  scalar& Value,
235  label& Count,
236  const int tag,
237  const label communicator
238 )
239 {
240  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
241  {
242  Pout<< "** reducing:" << Value << " with comm:" << communicator
243  << " warnComm:" << UPstream::warnComm
244  << endl;
246  }
247  vector2D twoScalars(Value, scalar(Count));
248  reduce(twoScalars, sumOp<vector2D>(), tag, communicator);
249 
250  Value = twoScalars.x();
251  Count = twoScalars.y();
252 }
253 
254 
255 void Foam::reduce
256 (
257  scalar& Value,
258  const sumOp<scalar>& bop,
259  const int tag,
260  const label communicator,
261  label& requestID
262 )
263 {
264 #ifdef MPIX_COMM_TYPE_SHARED
265  // Assume mpich2 with non-blocking collectives extensions. Once mpi3
266  // is available this will change.
267  MPI_Request request;
268  scalar v = Value;
269  MPIX_Ireduce
270  (
271  &v,
272  &Value,
273  1,
274  MPI_SCALAR,
275  MPI_SUM,
276  0, //root
277  PstreamGlobals::MPICommunicators_[communicator],
278  &request
279  );
280 
281  requestID = PstreamGlobals::outstandingRequests_.size();
282  PstreamGlobals::outstandingRequests_.append(request);
283 
284  if (UPstream::debug)
285  {
286  Pout<< "UPstream::allocateRequest for non-blocking reduce"
287  << " : request:" << requestID
288  << endl;
289  }
290 #else
291  // Non-blocking not yet implemented in mpi
292  reduce(Value, bop, tag, communicator);
293  requestID = -1;
294 #endif
295 }
296 
297 
299 (
300  const label parentIndex,
301  const label index
302 )
303 {
304  if (index == PstreamGlobals::MPIGroups_.size())
305  {
306  // Extend storage with dummy values
307  MPI_Group newGroup = MPI_GROUP_NULL;
308  PstreamGlobals::MPIGroups_.append(newGroup);
309  MPI_Comm newComm = MPI_COMM_NULL;
310  PstreamGlobals::MPICommunicators_.append(newComm);
311  }
312  else if (index > PstreamGlobals::MPIGroups_.size())
313  {
315  << "PstreamGlobals out of sync with UPstream data. Problem."
317  }
318 
319 
320  if (parentIndex == -1)
321  {
322  // Allocate world communicator
323 
324  if (index != UPstream::worldComm)
325  {
327  << "world communicator should always be index "
329  }
330 
331  PstreamGlobals::MPICommunicators_[index] = MPI_COMM_WORLD;
332  MPI_Comm_group(MPI_COMM_WORLD, &PstreamGlobals::MPIGroups_[index]);
333  MPI_Comm_rank
334  (
336  &myProcNo_[index]
337  );
338 
339  // Set the number of processes to the actual number
340  int numProcs;
341  MPI_Comm_size(PstreamGlobals::MPICommunicators_[index], &numProcs);
342 
343  //procIDs_[index] = identity(numProcs);
344  procIDs_[index].setSize(numProcs);
345  forAll(procIDs_[index], i)
346  {
347  procIDs_[index][i] = i;
348  }
349  }
350  else
351  {
352  // Create new group
353  MPI_Group_incl
354  (
355  PstreamGlobals::MPIGroups_[parentIndex],
356  procIDs_[index].size(),
357  procIDs_[index].begin(),
359  );
360 
361  // Create new communicator
362  MPI_Comm_create
363  (
367  );
368 
369  if (PstreamGlobals::MPICommunicators_[index] == MPI_COMM_NULL)
370  {
371  myProcNo_[index] = -1;
372  }
373  else
374  {
375  if
376  (
377  MPI_Comm_rank
378  (
380  &myProcNo_[index]
381  )
382  )
383  {
385  << "Problem :"
386  << " when allocating communicator at " << index
387  << " from ranks " << procIDs_[index]
388  << " of parent " << parentIndex
389  << " cannot find my own rank"
391  }
392  }
393  }
394 }
395 
396 
397 void Foam::UPstream::freePstreamCommunicator(const label communicator)
398 {
399  if (communicator != UPstream::worldComm)
400  {
401  if (PstreamGlobals::MPICommunicators_[communicator] != MPI_COMM_NULL)
402  {
403  // Free communicator. Sets communicator to MPI_COMM_NULL
404  MPI_Comm_free(&PstreamGlobals::MPICommunicators_[communicator]);
405  }
406  if (PstreamGlobals::MPIGroups_[communicator] != MPI_GROUP_NULL)
407  {
408  // Free greoup. Sets group to MPI_GROUP_NULL
409  MPI_Group_free(&PstreamGlobals::MPIGroups_[communicator]);
410  }
411  }
412 }
413 
414 
416 {
418 }
419 
420 
422 {
424  {
426  }
427 }
428 
429 
430 void Foam::UPstream::waitRequests(const label start)
431 {
432  if (debug)
433  {
434  Pout<< "UPstream::waitRequests : starting wait for "
436  << " outstanding requests starting at " << start << endl;
437  }
438 
440  {
441  SubList<MPI_Request> waitRequests
442  (
445  start
446  );
447 
448  if
449  (
450  MPI_Waitall
451  (
452  waitRequests.size(),
453  waitRequests.begin(),
454  MPI_STATUSES_IGNORE
455  )
456  )
457  {
459  << "MPI_Waitall returned with error" << Foam::endl;
460  }
461 
462  resetRequests(start);
463  }
464 
465  if (debug)
466  {
467  Pout<< "UPstream::waitRequests : finished wait." << endl;
468  }
469 }
470 
471 
473 {
474  if (debug)
475  {
476  Pout<< "UPstream::waitRequest : starting wait for request:" << i
477  << endl;
478  }
479 
480  if (i >= PstreamGlobals::outstandingRequests_.size())
481  {
483  << "There are " << PstreamGlobals::outstandingRequests_.size()
484  << " outstanding send requests and you are asking for i=" << i
485  << nl
486  << "Maybe you are mixing blocking/non-blocking comms?"
488  }
489 
490  if
491  (
492  MPI_Wait
493  (
495  MPI_STATUS_IGNORE
496  )
497  )
498  {
500  << "MPI_Wait returned with error" << Foam::endl;
501  }
502 
503  if (debug)
504  {
505  Pout<< "UPstream::waitRequest : finished wait for request:" << i
506  << endl;
507  }
508 }
509 
510 
512 {
513  if (debug)
514  {
515  Pout<< "UPstream::finishedRequest : checking request:" << i
516  << endl;
517  }
518 
519  if (i >= PstreamGlobals::outstandingRequests_.size())
520  {
522  << "There are " << PstreamGlobals::outstandingRequests_.size()
523  << " outstanding send requests and you are asking for i=" << i
524  << nl
525  << "Maybe you are mixing blocking/non-blocking comms?"
527  }
528 
529  int flag;
530  MPI_Test
531  (
533  &flag,
534  MPI_STATUS_IGNORE
535  );
536 
537  if (debug)
538  {
539  Pout<< "UPstream::finishedRequest : finished request:" << i
540  << endl;
541  }
542 
543  return flag != 0;
544 }
545 
546 
548 {
549  int tag;
550  if (PstreamGlobals::freedTags_.size())
551  {
553  }
554  else
555  {
556  tag = PstreamGlobals::nTags_++;
557  }
558 
559  if (debug)
560  {
561  //if (UPstream::lateBlocking > 0)
562  //{
563  // string& poutp = Pout.prefix();
564  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = 'X';
565  // Perr.prefix() = Pout.prefix();
566  //}
567  Pout<< "UPstream::allocateTag " << s
568  << " : tag:" << tag
569  << endl;
570  }
571 
572  return tag;
573 }
574 
575 
577 {
578  int tag;
579  if (PstreamGlobals::freedTags_.size())
580  {
582  }
583  else
584  {
585  tag = PstreamGlobals::nTags_++;
586  }
587 
588  if (debug)
589  {
590  //if (UPstream::lateBlocking > 0)
591  //{
592  // string& poutp = Pout.prefix();
593  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = 'X';
594  // Perr.prefix() = Pout.prefix();
595  //}
596  Pout<< "UPstream::allocateTag " << s
597  << " : tag:" << tag
598  << endl;
599  }
600 
601  return tag;
602 }
603 
604 
605 void Foam::UPstream::freeTag(const char* s, const int tag)
606 {
607  if (debug)
608  {
609  //if (UPstream::lateBlocking > 0)
610  //{
611  // string& poutp = Pout.prefix();
612  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = ' ';
613  // Perr.prefix() = Pout.prefix();
614  //}
615  Pout<< "UPstream::freeTag " << s << " tag:" << tag << endl;
616  }
618 }
619 
620 
621 void Foam::UPstream::freeTag(const word& s, const int tag)
622 {
623  if (debug)
624  {
625  //if (UPstream::lateBlocking > 0)
626  //{
627  // string& poutp = Pout.prefix();
628  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = ' ';
629  // Perr.prefix() = Pout.prefix();
630  //}
631  Pout<< "UPstream::freeTag " << s << " tag:" << tag << endl;
632  }
634 }
635 
636 
637 // ************************************************************************* //
Foam::UPstream::allocateTag
static int allocateTag(const char *)
Definition: UPstream.C:547
Foam::UPstream::warnComm
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:261
OSspecific.H
Functions used by OpenFOAM that are specific to POSIX compliant operating systems and need to be repl...
Foam::UPstream::resetRequests
static void resetRequests(const label sz)
Truncate number of outstanding requests.
Definition: UPstream.C:102
SubList.H
Foam::word
A class for handling words, derived from string.
Definition: word.H:59
forAll
#define forAll(list, i)
Loop across all elements in list.
Definition: UList.H:406
UPstream.H
Foam::UPstream::exit
static void exit(int errnum=1)
Exit program.
Definition: UPstream.C:46
Foam::UPstream::waitRequests
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
Definition: UPstream.C:106
Foam::UPstream::abort
static void abort()
Abort program.
Definition: UPstream.C:52
Foam::endl
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:251
Foam::UPstream::freePstreamCommunicator
static void freePstreamCommunicator(const label index)
Free a communicator.
Definition: UPstream.C:92
Foam::PstreamGlobals::MPICommunicators_
DynamicList< MPI_Comm > MPICommunicators_
Foam::UPstream::allocatePstreamCommunicator
static void allocatePstreamCommunicator(const label parentIndex, const label index)
Allocate a communicator with index.
Definition: UPstream.C:85
n
label n
Definition: TABSMDCalcMethod2.H:31
Foam::allReduce
void allReduce(Type &Value, int count, MPI_Datatype MPIType, MPI_Op op, const BinaryOp &bop, const int tag, const label communicator)
Definition: allReduceTemplates.C:32
Foam::reduce
void reduce(const List< UPstream::commsStruct > &comms, T &Value, const BinaryOp &bop, const int tag, const label comm)
Definition: PstreamReduceOps.H:43
Foam::error::printStack
static void printStack(Ostream &)
Helper function to print a stack.
Definition: dummyPrintStack.C:30
Foam::getEnv
string getEnv(const word &)
Return environment variable of given name.
Definition: POSIX.C:101
Foam::label
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
Foam::PstreamGlobals::freedTags_
DynamicList< int > freedTags_
Foam::nl
static const char nl
Definition: Ostream.H:260
Foam::UPstream::waitRequest
static void waitRequest(const label i)
Wait until request i has finished.
Definition: UPstream.C:110
Foam::UPstream::addValidParOptions
static void addValidParOptions(HashTable< string > &validParOptions)
Add the valid option this type of communications library.
Definition: UPstream.C:31
Foam::PstreamGlobals::MPIGroups_
DynamicList< MPI_Group > MPIGroups_
Foam::FatalError
error FatalError
Foam::abort
errorManip< error > abort(error &err)
Definition: errorManip.H:131
s
gmvFile<< "tracers "<< particles.size()<< nl;forAllConstIter(Cloud< passiveParticle >, particles, iter){ gmvFile<< iter().position().x()<< " ";}gmvFile<< nl;forAllConstIter(Cloud< passiveParticle >, particles, iter){ gmvFile<< iter().position().y()<< " ";}gmvFile<< nl;forAllConstIter(Cloud< passiveParticle >, particles, iter){ gmvFile<< iter().position().z()<< " ";}gmvFile<< nl;forAll(lagrangianScalarNames, i){ word name=lagrangianScalarNames[i];IOField< scalar > s(IOobject(name, runTime.timeName(), cloud::prefix, mesh, IOobject::MUST_READ, IOobject::NO_WRITE))
PstreamReduceOps.H
allReduce.H
Various functions to wrap MPI_Allreduce.
Foam::DynamicList::append
DynamicList< T, SizeInc, SizeMult, SizeDiv > & append(const T &)
Append an element at the end of the list.
Foam::exit
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:124
PstreamGlobals.H
FatalErrorInFunction
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:318
Foam::UPstream::nRequests
static label nRequests()
Get number of outstanding requests.
Definition: UPstream.C:96
Foam::Pout
prefixOSstream Pout(cout, "Pout")
Definition: IOstreams.H:53
Foam::vector2D
Vector2D< scalar > vector2D
vector2D obtained from generic Vector2D
Definition: vector2D.H:49
Foam::UPstream::worldComm
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:258
Foam::PstreamGlobals::outstandingRequests_
DynamicList< MPI_Request > outstandingRequests_
Foam::UPstream::finishedRequest
static bool finishedRequest(const label i)
Non-blocking comms: has request i finished?
Definition: UPstream.C:114
Foam::DynamicList::remove
T remove()
Remove and return the top element.
Definition: DynamicListI.H:365
Foam::sumReduce
void sumReduce(T &Value, label &Count, const int tag=Pstream::msgType(), const label comm=UPstream::worldComm)
Definition: PstreamReduceOps.H:125
Foam::PstreamGlobals::nTags_
int nTags_
WarningInFunction
#define WarningInFunction
Report a warning using Foam::Warning.
Definition: messageStream.H:259
Foam::UPstream::init
static bool init(int &argc, char **&argv)
Initialisation function called from main.
Definition: UPstream.C:35
Foam::UPstream::freeTag
static void freeTag(const char *, const int tag)
Definition: UPstream.C:605