UPstream.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | Copyright (C) 2011-2015 OpenFOAM Foundation
6  \\/ M anipulation | Copyright (C) 2015 OpenCFD Ltd.
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 \*---------------------------------------------------------------------------*/
25 
26 #include "UPstream.H"
27 #include "debug.H"
28 #include "registerSwitch.H"
29 #include "dictionary.H"
30 #include "IOstreams.H"
31 
32 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
33 
34 namespace Foam
35 {
36  defineTypeNameAndDebug(UPstream, 0);
37 
38  template<>
39  const char* Foam::NamedEnum
40  <
42  3
43  >::names[] =
44  {
45  "blocking",
46  "scheduled",
47  "nonBlocking"
48  };
49 }
50 
51 
54 
55 
56 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
57 
58 void Foam::UPstream::setParRun(const label nProcs)
59 {
60  if (nProcs == 0)
61  {
62  parRun_ = false;
64  label comm = allocateCommunicator(-1, labelList(1, label(0)), false);
65  if (comm != UPstream::worldComm)
66  {
68  << "problem : comm:" << comm
69  << " UPstream::worldComm:" << UPstream::worldComm
71  }
72 
73  Pout.prefix() = "";
74  Perr.prefix() = "";
75  }
76  else
77  {
78  parRun_ = true;
79 
80  // Redo worldComm communicator (this has been created at static
81  // initialisation time)
83  label comm = allocateCommunicator(-1, identity(nProcs), true);
84  if (comm != UPstream::worldComm)
85  {
87  << "problem : comm:" << comm
88  << " UPstream::worldComm:" << UPstream::worldComm
90  }
91 
92  Pout.prefix() = '[' + name(myProcNo(Pstream::worldComm)) + "] ";
93  Perr.prefix() = '[' + name(myProcNo(Pstream::worldComm)) + "] ";
94  }
95 }
96 
97 
99 (
100  const label nProcs
101 )
102 {
103  List<commsStruct> linearCommunication(nProcs);
104 
105  // Master
106  labelList belowIDs(nProcs - 1);
107  forAll(belowIDs, i)
108  {
109  belowIDs[i] = i + 1;
110  }
111 
112  linearCommunication[0] = commsStruct
113  (
114  nProcs,
115  0,
116  -1,
117  belowIDs,
118  labelList(0)
119  );
120 
121  // Slaves. Have no below processors, only communicate up to master
122  for (label procID = 1; procID < nProcs; procID++)
123  {
124  linearCommunication[procID] = commsStruct
125  (
126  nProcs,
127  procID,
128  0,
129  labelList(0),
130  labelList(0)
131  );
132  }
133  return linearCommunication;
134 }
135 
136 
137 // Append my children (and my children children etc.) to allReceives.
139 (
140  const label procID,
141  const List<DynamicList<label> >& receives,
142  DynamicList<label>& allReceives
143 )
144 {
145  const DynamicList<label>& myChildren = receives[procID];
146 
147  forAll(myChildren, childI)
148  {
149  allReceives.append(myChildren[childI]);
150  collectReceives(myChildren[childI], receives, allReceives);
151  }
152 }
153 
154 
155 // Tree like schedule. For 8 procs:
156 // (level 0)
157 // 0 receives from 1
158 // 2 receives from 3
159 // 4 receives from 5
160 // 6 receives from 7
161 // (level 1)
162 // 0 receives from 2
163 // 4 receives from 6
164 // (level 2)
165 // 0 receives from 4
166 //
167 // The sends/receives for all levels are collected per processor (one send per
168 // processor; multiple receives possible) creating a table:
169 //
170 // So per processor:
171 // proc receives from sends to
172 // ---- ------------- --------
173 // 0 1,2,4 -
174 // 1 - 0
175 // 2 3 0
176 // 3 - 2
177 // 4 5 0
178 // 5 - 4
179 // 6 7 4
180 // 7 - 6
182 (
183  label nProcs
184 )
185 {
186  label nLevels = 1;
187  while ((1 << nLevels) < nProcs)
188  {
189  nLevels++;
190  }
191 
192  List<DynamicList<label> > receives(nProcs);
193  labelList sends(nProcs, -1);
194 
195  // Info<< "Using " << nLevels << " communication levels" << endl;
196 
197  label offset = 2;
198  label childOffset = offset/2;
199 
200  for (label level = 0; level < nLevels; level++)
201  {
202  label receiveID = 0;
203  while (receiveID < nProcs)
204  {
205  // Determine processor that sends and we receive from
206  label sendID = receiveID + childOffset;
207 
208  if (sendID < nProcs)
209  {
210  receives[receiveID].append(sendID);
211  sends[sendID] = receiveID;
212  }
213 
214  receiveID += offset;
215  }
216 
217  offset <<= 1;
218  childOffset <<= 1;
219  }
220 
221  // For all processors find the processors it receives data from
222  // (and the processors they receive data from etc.)
223  List<DynamicList<label> > allReceives(nProcs);
224  for (label procID = 0; procID < nProcs; procID++)
225  {
226  collectReceives(procID, receives, allReceives[procID]);
227  }
228 
229 
230  List<commsStruct> treeCommunication(nProcs);
231 
232  for (label procID = 0; procID < nProcs; procID++)
233  {
234  treeCommunication[procID] = commsStruct
235  (
236  nProcs,
237  procID,
238  sends[procID],
239  receives[procID].shrink(),
240  allReceives[procID].shrink()
241  );
242  }
243  return treeCommunication;
244 }
245 
246 
248 (
249  const label parentIndex,
250  const labelList& subRanks,
251  const bool doPstream
252 )
253 {
254  label index;
255  if (!freeComms_.empty())
256  {
257  index = freeComms_.pop();
258  }
259  else
260  {
261  // Extend storage
262  index = parentCommunicator_.size();
263 
264  myProcNo_.append(-1);
265  procIDs_.append(List<int>(0));
266  parentCommunicator_.append(-1);
267  linearCommunication_.append(List<commsStruct>(0));
268  treeCommunication_.append(List<commsStruct>(0));
269  }
270 
271  if (debug)
272  {
273  Pout<< "Communicators : Allocating communicator " << index << endl
274  << " parent : " << parentIndex << endl
275  << " procs : " << subRanks << endl
276  << endl;
277  }
278 
279  // Initialise; overwritten by allocatePstreamCommunicator
280  myProcNo_[index] = 0;
281 
282  // Convert from label to int
283  procIDs_[index].setSize(subRanks.size());
284  forAll(procIDs_[index], i)
285  {
286  procIDs_[index][i] = subRanks[i];
287 
288  // Enforce incremental order (so index is rank in next communicator)
289  if (i >= 1 && subRanks[i] <= subRanks[i-1])
290  {
292  << "subranks not sorted : " << subRanks
293  << " when allocating subcommunicator from parent "
294  << parentIndex
296  }
297  }
298  parentCommunicator_[index] = parentIndex;
299 
300  linearCommunication_[index] = calcLinearComm(procIDs_[index].size());
301  treeCommunication_[index] = calcTreeComm(procIDs_[index].size());
302 
303 
304  if (doPstream && parRun())
305  {
306  allocatePstreamCommunicator(parentIndex, index);
307  }
308 
309  return index;
310 }
311 
312 
314 (
315  const label communicator,
316  const bool doPstream
317 )
318 {
319  if (debug)
320  {
321  Pout<< "Communicators : Freeing communicator " << communicator << endl
322  << " parent : " << parentCommunicator_[communicator] << endl
323  << " myProcNo : " << myProcNo_[communicator] << endl
324  << endl;
325  }
326 
327  if (doPstream && parRun())
328  {
329  freePstreamCommunicator(communicator);
330  }
331  myProcNo_[communicator] = -1;
332  //procIDs_[communicator].clear();
333  parentCommunicator_[communicator] = -1;
334  linearCommunication_[communicator].clear();
335  treeCommunication_[communicator].clear();
336 
337  freeComms_.push(communicator);
338 }
339 
340 
341 void Foam::UPstream::freeCommunicators(const bool doPstream)
342 {
343  forAll(myProcNo_, communicator)
344  {
345  if (myProcNo_[communicator] != -1)
346  {
347  freeCommunicator(communicator, doPstream);
348  }
349  }
350 }
351 
352 
353 int Foam::UPstream::baseProcNo(const label myComm, const int myProcID)
354 {
355  int procID = myProcID;
356  label comm = myComm;
357 
358  while (parent(comm) != -1)
359  {
360  const List<int>& parentRanks = UPstream::procID(comm);
361  procID = parentRanks[procID];
362  comm = UPstream::parent(comm);
363  }
364 
365  return procID;
366 }
367 
368 
369 Foam::label Foam::UPstream::procNo(const label myComm, const int baseProcID)
370 {
371  const List<int>& parentRanks = procID(myComm);
372  label parentComm = parent(myComm);
373 
374  if (parentComm == -1)
375  {
376  return findIndex(parentRanks, baseProcID);
377  }
378  else
379  {
380  label parentRank = procNo(parentComm, baseProcID);
381  return findIndex(parentRanks, parentRank);
382  }
383 }
384 
385 
387 (
388  const label myComm,
389  const label currentComm,
390  const int currentProcID
391 )
392 {
393  label physProcID = UPstream::baseProcNo(currentComm, currentProcID);
394  return procNo(myComm, physProcID);
395 }
396 
397 
398 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
399 
400 // By default this is not a parallel run
401 bool Foam::UPstream::parRun_(false);
402 
403 // Free communicators
405 
406 // My processor number
408 
409 // List of process IDs
411 
412 // Parent communicator
414 
415 // Standard transfer message type
417 
418 // Linear communication schedule
421 
422 // Multi level communication schedule
425 
426 
427 // Allocate a serial communicator. This gets overwritten in parallel mode
428 // (by UPstream::setParRun())
430 (
431  -1,
433  false
434 );
435 
436 
437 
438 // Should compact transfer be used in which floats replace doubles
439 // reducing the bandwidth requirement at the expense of some loss
440 // in accuracy
442 (
443  Foam::debug::optimisationSwitch("floatTransfer", 0)
444 );
446 (
447  "floatTransfer",
448  bool,
450 );
451 
452 // Number of processors at which the reduce algorithm changes from linear to
453 // tree
455 (
456  Foam::debug::optimisationSwitch("nProcsSimpleSum", 16)
457 );
459 (
460  "nProcsSimpleSum",
461  int,
463 );
464 
465 // Default commsType
467 (
468  commsTypeNames.read(Foam::debug::optimisationSwitches().lookup("commsType"))
469 );
470 
471 namespace Foam
472 {
473  // Register re-reader
475  :
477  {
478  public:
479 
480  addcommsTypeToOpt(const char* name)
481  :
483  {}
484 
486  {}
487 
488  virtual void readData(Foam::Istream& is)
489  {
491  (
492  is
493  );
494  }
495 
496  virtual void writeData(Foam::Ostream& os) const
497  {
499  }
500  };
501 
502  addcommsTypeToOpt addcommsTypeToOpt_("commsType");
503 }
504 
505 // Default communicator
507 
508 
509 // Warn for use of any communicator
511 
512 
513 // Number of polling cycles in processor updates
515 (
516  Foam::debug::optimisationSwitch("nPollProcInterfaces", 0)
517 );
519 (
520  "nPollProcInterfaces",
521  int,
523 );
524 
525 
526 // ************************************************************************* //
Foam::addcommsTypeToOpt::readData
virtual void readData(Foam::Istream &is)
Read.
Definition: UPstream.C:488
Foam::UPstream::procIDs_
static DynamicList< List< int > > procIDs_
Definition: UPstream.H:190
Foam::UPstream::warnComm
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:261
IOstreams.H
Useful combination of include files which define Sin, Sout and Serr and the use of IO streams general...
debug.H
Foam::labelList
List< label > labelList
A List of labels.
Definition: labelList.H:56
Foam::UPstream::baseProcNo
static int baseProcNo(const label myComm, const int procID)
Return physical processor number (i.e. processor number in.
Definition: UPstream.C:353
forAll
#define forAll(list, i)
Loop across all elements in list.
Definition: UList.H:406
UPstream.H
Foam::findIndex
label findIndex(const ListType &, typename ListType::const_reference, const label start=0)
Find first occurence of given element and return index,.
Foam::DynamicList< label >
Foam::UPstream::msgType_
static int msgType_
Definition: UPstream.H:184
Foam::UPstream::nProcs
static label nProcs(const label communicator=0)
Number of processes in parallel run.
Definition: UPstream.H:387
Foam::addcommsTypeToOpt
Definition: UPstream.C:474
serialComm
Foam::UPstream::communicator serialComm(-1, Foam::labelList(1, Foam::label(0)), false)
Foam::UPstream::linearCommunication_
static DynamicList< List< commsStruct > > linearCommunication_
Definition: UPstream.H:193
Foam::dictionary::lookup
ITstream & lookup(const word &, bool recursive=false, bool patternMatch=true) const
Find and return an entry data stream.
Definition: dictionary.C:449
Foam::Perr
prefixOSstream Perr(cerr, "Perr")
Definition: IOstreams.H:54
Foam::UPstream::commsTypeNames
static const NamedEnum< commsTypes, 3 > commsTypeNames
Definition: UPstream.H:71
Foam::UPstream::freeComms_
static LIFOStack< label > freeComms_
Definition: UPstream.H:188
Foam::endl
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:251
Foam::UPstream::defaultCommsType
static commsTypes defaultCommsType
Default commsType.
Definition: UPstream.H:252
Foam::addcommsTypeToOpt::addcommsTypeToOpt
addcommsTypeToOpt(const char *name)
Definition: UPstream.C:480
Foam::debug::optimisationSwitch
int optimisationSwitch(const char *name, const int defaultValue=0)
Lookup optimisation switch or add default value.
Definition: debug.C:182
Foam::UPstream::allocateCommunicator
static label allocateCommunicator(const label parent, const labelList &subRanks, const bool doPstream=true)
Allocate a new communicator.
Definition: UPstream.C:248
Foam::UPstream::setParRun
static void setParRun(const label nProcs)
Set data for parallel running. Special case nProcs=0 to switch off.
Definition: UPstream.C:58
Foam::label
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
Foam::Istream
An Istream is an abstract base class for all input systems (streams, files, token lists etc)....
Definition: Istream.H:57
Foam::List::append
void append(const T &)
Append an element at the end of the list.
Foam::addcommsTypeToOpt::~addcommsTypeToOpt
virtual ~addcommsTypeToOpt()
Definition: UPstream.C:485
Foam::UPstream::floatTransfer
static bool floatTransfer
Should compact transfer be used in which floats replace doubles.
Definition: UPstream.H:245
Foam::UPstream::procID
static List< int > & procID(label communicator)
Process ID of given process index.
Definition: UPstream.H:416
Foam::UPstream::freeCommunicator
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:314
Foam::addcommsTypeToOpt_
addcommsTypeToOpt addcommsTypeToOpt_("commsType")
Foam::identity
labelList identity(const label len)
Create identity map (map[i] == i) of given length.
Definition: ListOps.C:104
Foam::UPstream::parRun_
static bool parRun_
Definition: UPstream.H:183
Foam::FatalError
error FatalError
Foam
Namespace for OpenFOAM.
Definition: combustionModel.C:30
registerOptSwitch
registerOptSwitch("floatTransfer", bool, Foam::UPstream::floatTransfer)
Foam::abort
errorManip< error > abort(error &err)
Definition: errorManip.H:131
Foam::UPstream::commsStruct
Structure for communicating between processors.
Definition: UPstream.H:76
Foam::DynamicList::append
DynamicList< T, SizeInc, SizeMult, SizeDiv > & append(const T &)
Append an element at the end of the list.
Foam::exit
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:124
Foam::UPstream::myProcNo
static int myProcNo(const label communicator=0)
Number of this process (starting from masterNo() = 0)
Definition: UPstream.H:405
Foam::UPstream::nProcsSimpleSum
static int nProcsSimpleSum
Number of processors at which the sum algorithm changes from linear.
Definition: UPstream.H:249
Foam::NamedEnum::read
Enum read(Istream &) const
Read a word from Istream and return the corresponding.
Definition: NamedEnum.C:61
Foam::UPstream::commsTypes
commsTypes
Types of communications.
Definition: UPstream.H:64
FatalErrorInFunction
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:318
Foam::addcommsTypeToOpt::writeData
virtual void writeData(Foam::Ostream &os) const
Write.
Definition: UPstream.C:496
Foam::UPstream::collectReceives
static void collectReceives(const label procID, const List< DynamicList< label > > &receives, DynamicList< label > &allReceives)
Helper function for tree communication schedule determination.
Definition: UPstream.C:139
Foam::Pout
prefixOSstream Pout(cout, "Pout")
Definition: IOstreams.H:53
Foam::UPstream::worldComm
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:258
Foam::List
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: HashTable.H:59
Foam::UPstream::procNo
static label procNo(const label comm, const int baseProcID)
Return processor number in communicator (given physical processor.
Definition: UPstream.C:369
dictionary.H
Foam::UPstream::freeCommunicators
static void freeCommunicators(const bool doPstream)
Free all communicators.
Definition: UPstream.C:341
Foam::UPstream::calcLinearComm
static List< commsStruct > calcLinearComm(const label nProcs)
Calculate linear communication schedule.
Definition: UPstream.C:99
Foam::UPstream::myProcNo_
static DynamicList< int > myProcNo_
Definition: UPstream.H:189
registerSwitch.H
Foam::List::size
void size(const label)
Override size to be inconsistent with allocated storage.
Foam::Ostream
An Ostream is an abstract base class for all output systems (streams, files, token lists,...
Definition: Ostream.H:53
Foam::UPstream::parent
static label parent(const label communicator)
Definition: UPstream.H:410
Foam::UPstream::communicator
Helper class for allocating/freeing communicators.
Definition: UPstream.H:294
Foam::LIFOStack
A LIFO stack based on a singly-linked list.
Definition: LIFOStack.H:51
Foam::UPstream::parentCommunicator_
static DynamicList< label > parentCommunicator_
Definition: UPstream.H:191
Foam::debug::optimisationSwitches
dictionary & optimisationSwitches()
The OptimisationSwitches sub-dictionary in the central controlDict.
Definition: debug.C:158
Foam::defineTypeNameAndDebug
defineTypeNameAndDebug(combustionModel, 0)
Foam::UPstream::calcTreeComm
static List< commsStruct > calcTreeComm(const label nProcs)
Calculate tree communication schedule.
Definition: UPstream.C:182
Foam::prefixOSstream::prefix
const string & prefix() const
Return the prefix of the stream.
Definition: prefixOSstream.H:86
Foam::debug::addOptimisationObject
void addOptimisationObject(const char *name, simpleRegIOobject *obj)
Register optimisation switch read/write object.
Definition: debug.C:234
Foam::simpleRegIOobject
Abstract base class for registered object with I/O. Used in debug symbol registration.
Definition: simpleRegIOobject.H:50
Foam::name
word name(const complex &)
Return a string representation of a complex.
Definition: complex.C:47
Foam::NamedEnum
Initialise the NamedEnum HashTable from the static list of names.
Definition: NamedEnum.H:52
Foam::UPstream::nPollProcInterfaces
static int nPollProcInterfaces
Number of polling cycles in processor updates.
Definition: UPstream.H:255
Foam::UPstream::treeCommunication_
static DynamicList< List< commsStruct > > treeCommunication_
Definition: UPstream.H:194