helperFunctionsPar.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | cfMesh: A library for mesh generation
4  \\ / O peration |
5  \\ / A nd | Author: Franjo Juretic (franjo.juretic@c-fields.com)
6  \\/ M anipulation | Copyright (C) Creative Fields, Ltd.
7 -------------------------------------------------------------------------------
8 License
9  This file is part of cfMesh.
10 
11  cfMesh is free software; you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by the
13  Free Software Foundation; either version 3 of the License, or (at your
14  option) any later version.
15 
16  cfMesh is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with cfMesh. If not, see <http://www.gnu.org/licenses/>.
23 
24 Description
25 
26 \*---------------------------------------------------------------------------*/
27 
28 #include "error.H"
29 #include "helperFunctionsPar.H"
30 #include "DynList.H"
31 #include "labelPair.H"
32 #include "HashSet.H"
33 
34 # ifdef USE_OMP
35 #include <omp.h>
36 # endif
37 
38 namespace Foam
39 {
40 
41 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *//
42 
43 namespace help
44 {
45 
46 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *//
47 
48 template<class sendOp, class combineOp, class T, class ListType>
49 void combineData(const sendOp& sop, combineOp& cop)
50 {
51  std::map<label, LongList<T> > sMap;
52  sop(sMap);
53 
55 
56  exchangeMap(sMap, data);
57 
58  cop(data);
59 }
60 
61 template<class T, class ListType, class scatterOp, class gatherOp>
62 void whisperReduce(const ListType& neis, const scatterOp& sop, gatherOp& gop)
63 {
64  DynList<label> below, above;
65  forAll(neis, i)
66  {
67  if( neis[i] < Pstream::myProcNo() )
68  {
69  above.append(neis[i]);
70  }
71  else if( neis[i] > Pstream::myProcNo() )
72  {
73  below.append(neis[i]);
74  }
75  }
76 
77  //- receive the data from the processors above
78  forAll(above, aboveI)
79  {
80  //- receive the data
81  List<T> receivedData;
82  IPstream fromOtherProc(Pstream::blocking, above[aboveI]);
83  fromOtherProc >> receivedData;
84 
85  gop(receivedData);
86  }
87 
88  //- send the data to the processors below
89  forAll(below, belowI)
90  {
91  const label neiProc = below[belowI];
92 
93  LongList<T> dts;
94  sop(dts);
95 
96  //- send the data
97  OPstream toOtherProc(Pstream::blocking, neiProc, dts.byteSize());
98  toOtherProc << dts;
99  }
100 
101  //- gather the data from the processors below to the processors above
102  //- receive the data from the processors below
103  forAllReverse(below, belowI)
104  {
105  //- receive the data
106  List<T> receivedData;
107  IPstream fromOtherProc(Pstream::blocking, below[belowI]);
108  fromOtherProc >> receivedData;
109 
110  gop(receivedData);
111  }
112 
113  //- send the data to the processors below
114  forAllReverse(above, aboveI)
115  {
116  const label neiProc = above[aboveI];
117 
118  LongList<T> dts;
119  sop(dts);
120 
121  //- send the data
122  OPstream toOtherProc(Pstream::blocking, neiProc, dts.byteSize());
123  toOtherProc << dts;
124  }
125 }
126 
127 template<class T, class ListType>
128 void exchangeMap
129 (
130  const std::map<label, ListType>& m,
131  LongList<T>& data,
132  const Pstream::commsTypes commsType
133 )
134 {
135  data.clear();
136 
137  if( !contiguous<T>() )
138  FatalError << "Data is not contiguous" << exit(FatalError);
139 
140  typename std::map<label, ListType>::const_iterator iter;
141 
142  //- check which processors shall exchange the data and which ones shall not
143  labelHashSet receiveData;
144  for(iter=m.begin();iter!=m.end();++iter)
145  {
146  OPstream toOtherProc(Pstream::blocking, iter->first, sizeof(label));
147 
148  toOtherProc << iter->second.size();
149  }
150 
151  for(iter=m.begin();iter!=m.end();++iter)
152  {
153  IPstream fromOtherProc(Pstream::blocking, iter->first, sizeof(label));
154 
155  label s;
156  fromOtherProc >> s;
157 
158  if( s != 0 )
159  receiveData.insert(iter->first);
160  }
161 
162  if( commsType == Pstream::blocking )
163  {
164  //- start with blocking type of send and received operation
165 
166  //- send data to other processors
167  for(iter=m.begin();iter!=m.end();++iter)
168  {
169  const ListType& dts = iter->second;
170 
171  if( dts.size() == 0 )
172  continue;
173 
174  OPstream toOtherProc
175  (
177  iter->first,
178  dts.byteSize()
179  );
180  toOtherProc << dts;
181  }
182 
183  //- receive data from other processors
184  for(iter=m.begin();iter!=m.end();++iter)
185  {
186  if( !receiveData.found(iter->first) )
187  continue;
188 
189  IPstream fromOtherProc(Pstream::blocking, iter->first);
190  data.appendFromStream(fromOtherProc);
191  }
192  }
193  else if( commsType == Pstream::scheduled )
194  {
195  //- start with scheduled data transfer
196  //- this type of transfer is intended for long messages because
197  //- it does not require any buffer
198 
199  //- receive data from processors with lower ids
200  for(iter=m.begin();iter!=m.end();++iter)
201  {
202  if( iter->first >= Pstream::myProcNo() )
203  continue;
204  if( !receiveData.found(iter->first) )
205  continue;
206 
207  //List<T> receive;
208  IPstream fromOtherProc(Pstream::scheduled, iter->first);
209  //fromOtherProc >> receive;
210  data.appendFromStream(fromOtherProc);
211 
212  //forAll(receive, i)
213  // data.append(receive[i]);
214  }
215 
216  //- send data to processors with greater ids
217  for(iter=m.begin();iter!=m.end();++iter)
218  {
219  if( iter->first <= Pstream::myProcNo() )
220  continue;
221 
222  const ListType& dts = iter->second;
223 
224  if( dts.size() == 0 )
225  continue;
226 
227  OPstream toOtherProc
228  (
230  iter->first,
231  dts.byteSize()
232  );
233 
234  toOtherProc << dts;
235  }
236 
237  //- receive data from processors with greater ids
238  typename std::map<label, ListType>::const_reverse_iterator riter;
239  for(riter=m.rbegin();riter!=m.rend();++riter)
240  {
241  if( riter->first <= Pstream::myProcNo() )
242  continue;
243  if( !receiveData.found(riter->first) )
244  continue;
245 
246  IPstream fromOtherProc(Pstream::scheduled, riter->first);
247  //List<T> receive;
248  //fromOtherProc >> receive;
249  data.appendFromStream(fromOtherProc);
250 
251  //forAll(receive, i)
252  // data.append(receive[i]);
253  }
254 
255  //- send data to processors with lower ids
256  for(riter=m.rbegin();riter!=m.rend();++riter)
257  {
258  if( riter->first >= Pstream::myProcNo() )
259  continue;
260 
261  const ListType& dts = riter->second;
262 
263  if( dts.size() == 0 )
264  continue;
265 
266  OPstream toOtherProc
267  (
269  riter->first,
270  dts.byteSize()
271  );
272 
273  toOtherProc << dts;
274  }
275  }
276  else
277  {
279  (
280  "template<class T, class ListType>"
281  "void exchangeMap"
282  "(const std::map<label, ListType>& m, LongList<T>& data,"
283  " const Pstream::commsTypes commsType)"
284  ) << "Unknown communication type" << exit(FatalError);
285  }
286 
287  # ifdef DEBUGExchangeMap
288  labelList nReceived(Pstream::nProcs(), 0);
289  for(iter=m.begin();iter!=m.end();++iter)
290  nReceived[iter->first] += iter->second.size();
291 
292  reduce(nReceived, sumOp<labelList>());
293 
294  if( nReceived[Pstream::myProcNo()] != data.size() )
295  FatalError << "Invalid data read!" << abort(FatalError);
296  # endif
297 }
298 
299 template<class T, class ListType>
300 void exchangeMap
301 (
302  const std::map<label, ListType>& m,
303  std::map<label, List<T> >&mOut
304 )
305 {
306  mOut.clear();
307 
308  if( !contiguous<T>() )
309  FatalError << "Data is not contigous" << exit(FatalError);
310 
311  typename std::map<label, ListType>::const_iterator iter;
312 
313  //- send data to other processors
314  for(iter=m.begin();iter!=m.end();++iter)
315  {
316  const ListType& dataToSend = iter->second;
317 
318  OPstream toOtherProc
319  (
321  iter->first,
322  dataToSend.byteSize()
323  );
324  toOtherProc << dataToSend;
325  }
326 
327  //- receive data from other processors
328  for(iter=m.begin();iter!=m.end();++iter)
329  {
330  mOut.insert(std::make_pair(iter->first, List<T>()));
331  List<T>& dataToReceive = mOut[iter->first];
332 
333  IPstream fromOtherProc(Pstream::blocking, iter->first);
334  fromOtherProc >> dataToReceive;
335  }
336 }
337 
338 template<class RowType, template<class ListTypeArg> class GraphType>
340 (
341  const GraphType<RowType>& addr,
342  GraphType<RowType>& reverseAddr
343 )
344 {
345  reverseAddr.setSize(0);
346  labelList nAppearances;
347 
348  label minRow(INT_MAX), maxRow(0);
349  DynList<std::map<label, DynList<labelPair, 64> > > dataForOtherThreads;
350 
351  # ifdef USE_OMP
352  # pragma omp parallel
353  # endif
354  {
355  //- find min and max entry in the graph
356  //- they are used for assigning ranges of values local for each process
357  label localMinRow(minRow), localMaxRow(0);
358  # ifdef USE_OMP
359  # pragma omp for schedule(guided)
360  # endif
361  forAll(addr, rowI)
362  {
363  const RowType& row = addr[rowI];
364 
365  forAll(row, i)
366  {
367  localMaxRow = Foam::max(localMaxRow, row[i]);
368  localMinRow = Foam::min(localMinRow, row[i]);
369  }
370  }
371 
372  ++localMaxRow;
373 
374  # ifdef USE_OMP
375  # pragma omp critical
376  # endif
377  {
378  minRow = Foam::min(minRow, localMinRow);
379  maxRow = Foam::max(maxRow, localMaxRow);
380  }
381 
382  # ifdef USE_OMP
383  # pragma omp barrier
384 
385  //- allocate the rows of the transposed graph
386  # pragma omp master
387  # endif
388  {
389  # ifdef USE_OMP
390  dataForOtherThreads.setSize(omp_get_num_threads());
391  # else
392  dataForOtherThreads.setSize(1);
393  # endif
394  nAppearances.setSize(maxRow);
395  reverseAddr.setSize(maxRow);
396  }
397 
398  # ifdef USE_OMP
399  const label nProcs = omp_get_num_threads();
400  const label procNo = omp_get_thread_num();
401  # else
402  const label nProcs = 1;
403  const label procNo = 0;
404  # endif
405 
406  # ifdef USE_OMP
407  # pragma omp barrier
408 
409  //- initialise appearances
410  # pragma omp for
411  # endif
412  for(label i=0;i<maxRow;++i)
413  nAppearances[i] = 0;
414 
415  const label range = (maxRow - minRow) / nProcs + 1;
416  const label localMin = minRow + procNo * range;
417  const label localMax = Foam::min(localMin + range, maxRow);
418 
419  //- find the number of appearances of each element in the original graph
420  # ifdef USE_OMP
421  # pragma omp for
422  # endif
423  forAll(addr, rowI)
424  {
425  const RowType& row = addr[rowI];
426 
427  forAll(row, j)
428  {
429  const label entryI = row[j];
430 
431  const label procI = (entryI - minRow) / range;
432  if( procI != procNo )
433  {
434  dataForOtherThreads[procNo][procI].append
435  (
436  labelPair(entryI, rowI)
437  );
438  }
439  else
440  {
441  ++nAppearances[entryI];
442  }
443  }
444  }
445 
446  # ifdef USE_OMP
447  # pragma omp barrier
448  # endif
449 
450  //- count the appearances which are not local to the processor
451  for(label i=0;i<nProcs;++i)
452  {
453  const std::map<label, DynList<labelPair, 64> >& data =
454  dataForOtherThreads[i];
455 
456  std::map<label, DynList<labelPair, 64> >::const_iterator it =
457  data.find(procNo);
458 
459  if( it != data.end() )
460  {
461  const DynList<labelPair, 64>& entries = it->second;
462 
463  forAll(entries, j)
464  ++nAppearances[entries[j].first()];
465  }
466  }
467 
468  # ifdef USE_OMP
469  # pragma omp barrier
470  # endif
471 
472  //- allocate reverse matrix
473  for(label i=localMin;i<localMax;++i)
474  {
475  reverseAddr[i].setSize(nAppearances[i]);
476  nAppearances[i] = 0;
477  }
478 
479  //- start filling reverse addressing graph
480  //- update data from processors with smaller labels
481  for(label i=0;i<procNo;++i)
482  {
483  const std::map<label, DynList<labelPair, 64> >& data =
484  dataForOtherThreads[i];
485  std::map<label, DynList<labelPair, 64> >::const_iterator it =
486  data.find(procNo);
487 
488  if( it != data.end() )
489  {
490  const DynList<labelPair, 64>& entries = it->second;
491 
492  forAll(entries, j)
493  {
494  const label entryI = entries[j].first();
495  reverseAddr[entryI][nAppearances[entryI]++] =
496  entries[j].second();
497  }
498  }
499  }
500 
501  //- update data local to the processor
502  # ifdef USE_OMP
503  # pragma omp for
504  # endif
505  forAll(addr, rowI)
506  {
507  const RowType& row = addr[rowI];
508 
509  forAll(row, j)
510  {
511  const label entryI = row[j];
512 
513  if( (entryI >= localMin) && (entryI < localMax) )
514  {
515  reverseAddr[entryI][nAppearances[entryI]++] = rowI;
516  }
517  }
518  }
519 
520  //- update data from the processors with higher labels
521  for(label i=procNo+1;i<nProcs;++i)
522  {
523  const std::map<label, DynList<labelPair, 64> >& data =
524  dataForOtherThreads[i];
525  std::map<label, DynList<labelPair, 64> >::const_iterator it =
526  data.find(procNo);
527 
528  if( it != data.end() )
529  {
530  const DynList<labelPair, 64>& entries = it->second;
531 
532  forAll(entries, j)
533  {
534  const label entryI = entries[j].first();
535  reverseAddr[entryI][nAppearances[entryI]++] =
536  entries[j].second();
537  }
538  }
539  }
540  }
541 }
542 
543 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *//
544 
545 } // End namespace help
546 
547 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *//
548 
549 } // End namespace Foam
550 
551 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
Foam::help::whisperReduce
void whisperReduce(const ListType &neis, const scatterOp &sop, gatherOp &gop)
Definition: helperFunctionsPar.C:62
helperFunctionsPar.H
Foam::UPstream::scheduled
@ scheduled
Definition: UPstream.H:67
forAll
#define forAll(list, i)
Loop across all elements in list.
Definition: UList.H:406
Foam::OPstream
Output inter-processor communications stream.
Definition: OPstream.H:50
Foam::localMin
LocalMin-mean differencing scheme class.
Definition: localMin.H:54
Foam::UPstream::nProcs
static label nProcs(const label communicator=0)
Number of processes in parallel run.
Definition: UPstream.H:387
Foam::HashSet< label, Hash< label > >
Foam::LongList< T >
Foam::UPstream::blocking
@ blocking
Definition: UPstream.H:66
Foam::reduce
void reduce(const List< UPstream::commsStruct > &comms, T &Value, const BinaryOp &bop, const int tag, const label comm)
Definition: PstreamReduceOps.H:43
Foam::localMax
LocalMax-mean differencing scheme class.
Definition: localMax.H:54
Foam::label
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
error.H
HashSet.H
Foam::FatalError
error FatalError
forAllReverse
#define forAllReverse(list, i)
Reverse loop across all elements in list.
Definition: UList.H:418
Foam::HashTable::found
bool found(const Key &) const
Return true if hashedEntry is found in table.
Definition: HashTable.C:109
Foam::help::reverseAddressingSMP
void reverseAddressingSMP(const GraphType< RowType > &addr, GraphType< RowType > &reverseAddr)
calculates the reverse addressing of the graph by transposing the graph
Definition: helperFunctionsPar.C:340
Foam
Namespace for OpenFOAM.
Definition: combustionModel.C:30
Foam::abort
errorManip< error > abort(error &err)
Definition: errorManip.H:131
s
gmvFile<< "tracers "<< particles.size()<< nl;forAllConstIter(Cloud< passiveParticle >, particles, iter){ gmvFile<< iter().position().x()<< " ";}gmvFile<< nl;forAllConstIter(Cloud< passiveParticle >, particles, iter){ gmvFile<< iter().position().y()<< " ";}gmvFile<< nl;forAllConstIter(Cloud< passiveParticle >, particles, iter){ gmvFile<< iter().position().z()<< " ";}gmvFile<< nl;forAll(lagrangianScalarNames, i){ word name=lagrangianScalarNames[i];IOField< scalar > s(IOobject(name, runTime.timeName(), cloud::prefix, mesh, IOobject::MUST_READ, IOobject::NO_WRITE))
Foam::DynList< label >
Foam::graphRow
Definition: graphRow.H:48
Foam::exit
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:124
Foam::UPstream::myProcNo
static int myProcNo(const label communicator=0)
Number of this process (starting from masterNo() = 0)
Definition: UPstream.H:405
Foam::max
dimensioned< Type > max(const dimensioned< Type > &, const dimensioned< Type > &)
Foam::List::setSize
void setSize(const label)
Reset size of List.
Foam::UPstream::commsTypes
commsTypes
Types of communications.
Definition: UPstream.H:64
range
scalar range
Definition: LISASMDCalcMethod1.H:12
Foam::sumOp
Definition: ops.H:162
Foam::LongList::byteSize
label byteSize() const
Return the binary size in number of characters of the UList.
Definition: LongListI.H:209
Foam::List
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: HashTable.H:59
Foam::DynList::setSize
void setSize(const label)
Reset size of List.
Definition: DynListI.H:263
Foam::help::exchangeMap
void exchangeMap(const std::map< label, ListType > &m, LongList< T > &data, const Pstream::commsTypes commsType)
Definition: helperFunctionsPar.C:129
Foam::HashSet::insert
bool insert(const Key &key)
Insert a new entry.
Definition: HashSet.H:116
Foam::IPstream
Input inter-processor communications stream.
Definition: IPstream.H:50
FatalErrorIn
#define FatalErrorIn(functionName)
Report an error message using Foam::FatalError.
Definition: error.H:313
Foam::List::size
void size(const label)
Override size to be inconsistent with allocated storage.
Foam::dictionary::clear
void clear()
Clear the dictionary.
Definition: dictionary.C:1050
Foam::min
dimensioned< Type > min(const dimensioned< Type > &, const dimensioned< Type > &)
Foam::labelPair
Pair< label > labelPair
Label pair.
Definition: labelPair.H:48
Foam::data
Database for solution data, solver performance and other reduced data.
Definition: data.H:52
DynList.H
labelPair.H
Foam::DynList::append
void append(const T &e)
Append an element at the end of the list.
Definition: DynListI.H:304
Foam::help::combineData
void combineData(const sendOp &sop, combineOp &cop)
Definition: helperFunctionsPar.C:49