dune-common  2.8.0
variablesizecommunicator.hh
Go to the documentation of this file.
1 // -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2 // vi: set et ts=4 sw=2 sts=2:
3 #ifndef DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH // Still fits the line!
4 #define DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
5 
6 #if HAVE_MPI
7 
8 #include <algorithm>
9 #include <cassert>
10 #include <cstddef>
11 #include <functional>
12 #include <map>
13 #include <memory>
14 #include <utility>
15 #include <vector>
16 #include <algorithm>
17 
18 #include <mpi.h>
19 
22 
35 namespace Dune
36 {
37 
38 namespace
39 {
44 template<class T, class Allocator=std::allocator<T> >
45 class MessageBuffer
46 {
47 public:
52  explicit MessageBuffer(int size)
53  : buffer_(new T[size]), size_(size), position_(0)
54  {}
59  explicit MessageBuffer(const MessageBuffer& o)
60  : buffer_(new T[o.size_]), size_(o.size_), position_(o.position_)
61  {
62  }
64  ~MessageBuffer()
65  {
66  delete[] buffer_;
67  }
72  void write(const T& data)
73  {
74  buffer_[position_++]=data;
75  }
76 
81  void read(T& data)
82  {
83  data=buffer_[position_++];
84  }
85 
91  void reset()
92  {
93  position_=0;
94  }
95 
100  bool finished()
101  {
102  return position_==size_;
103  }
104 
110  bool hasSpaceForItems(int noItems)
111  {
112  return position_+noItems<=size_;
113  }
118  std::size_t size() const
119  {
120  return size_;
121  }
126  operator T*()
127  {
128  return buffer_;
129  }
130 
131 private:
135  T* buffer_;
139  std::size_t size_;
143  std::size_t position_;
144 };
145 
149 class InterfaceTracker
150 {
151 public:
157  InterfaceTracker(int rank, InterfaceInformation info, std::size_t fixedsize=0,
158  bool allocateSizes=false)
159  : fixedSize(fixedsize),rank_(rank), index_(), interface_(info), sizes_()
160  {
161  if(allocateSizes)
162  {
163  sizes_.resize(info.size());
164  }
165  }
166 
170  void moveToNextIndex()
171  {
172  index_++;
173  assert(index_<=interface_.size());
174  skipZeroIndices();
175  }
180  void increment(std::size_t i)
181  {
182  index_+=i;
183  assert(index_<=interface_.size());
184  }
189  bool finished() const
190  {
191  return index_==interface_.size();
192  }
193 
194  void skipZeroIndices()
195  {
196  // skip indices with size zero!
197  while(sizes_.size() && index_!=interface_.size() &&!size())
198  ++index_;
199  }
200 
205  std::size_t index() const
206  {
207  return interface_[index_];
208  }
212  std::size_t size() const
213  {
214  assert(sizes_.size());
215  return sizes_[index_];
216  }
220  std::size_t* getSizesPointer()
221  {
222  return &sizes_[0];
223  }
228  bool empty() const
229  {
230  return !interface_.size();
231  }
232 
237  std::size_t indicesLeft() const
238  {
239  return interface_.size()-index_;
240  }
244  std::size_t fixedSize;
248  int rank() const
249  {
250  return rank_;
251  }
255  std::size_t offset() const
256  {
257  return index_;
258  }
259 private:
261  int rank_;
263  std::size_t index_;
265  InterfaceInformation interface_;
266  std::vector<std::size_t> sizes_;
267 };
268 
269 
270 } // end unnamed namespace
271 
309 template<class Allocator=std::allocator<std::pair<InterfaceInformation,InterfaceInformation> > >
311 {
312 public:
317  typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation>,
318  std::less<int>,
319  typename std::allocator_traits<Allocator>::template rebind_alloc< std::pair<const int,std::pair<InterfaceInformation,InterfaceInformation> > > > InterfaceMap;
320 
321 #ifndef DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE
328  VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf)
329  : maxBufferSize_(32768), interface_(&inf)
330  {
331  MPI_Comm_dup(comm, &communicator_);
332  }
338  : maxBufferSize_(32768), interface_(&inf.interfaces())
339  {
340  MPI_Comm_dup(inf.communicator(), &communicator_);
341  }
342 #else
349  VariableSizeCommunicator(MPI_Comm comm, InterfaceMap& inf)
350  : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
351  interface_(&inf)
352  {
353  MPI_Comm_dup(comm, &communicator_);
354  }
359  VariableSizeCommunicator(const Interface& inf)
360  : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
361  interface_(&inf.interfaces())
362  {
363  MPI_Comm_dup(inf.communicator(), &communicator_);
364  }
365 #endif
372  VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf, std::size_t max_buffer_size)
373  : maxBufferSize_(max_buffer_size), interface_(&inf)
374  {
375  MPI_Comm_dup(comm, &communicator_);
376  }
377 
383  VariableSizeCommunicator(const Interface& inf, std::size_t max_buffer_size)
384  : maxBufferSize_(max_buffer_size), interface_(&inf.interfaces())
385  {
386  MPI_Comm_dup(inf.communicator(), &communicator_);
387  }
388 
390  {
391  MPI_Comm_free(&communicator_);
392  }
393 
399  maxBufferSize_ = other.maxBufferSize_;
400  interface_ = other.interface_;
401  MPI_Comm_dup(other.communicator_, &communicator_);
402  }
403 
409  if(this == &other) // don't do anything if objects are the same
410  return *this;
411 
412  maxBufferSize_ = other.maxBufferSize_;
413  interface_ = other.interface_;
414  MPI_Comm_free(&communicator_);
415  MPI_Comm_dup(other.communicator_, &communicator_);
416 
417  return *this;
418  }
419 
439  template<class DataHandle>
440  void forward(DataHandle& handle)
441  {
442  communicate<true>(handle);
443  }
444 
464  template<class DataHandle>
465  void backward(DataHandle& handle)
466  {
467  communicate<false>(handle);
468  }
469 
470 private:
471  template<bool FORWARD, class DataHandle>
472  void communicateSizes(DataHandle& handle,
473  std::vector<InterfaceTracker>& recv_trackers);
474 
481  template<bool forward,class DataHandle>
482  void communicate(DataHandle& handle);
492  template<bool FORWARD, class DataHandle>
493  void setupInterfaceTrackers(DataHandle& handle,
494  std::vector<InterfaceTracker>& send_trackers,
495  std::vector<InterfaceTracker>& recv_trackers);
503  template<bool FORWARD, class DataHandle>
504  void communicateFixedSize(DataHandle& handle);
512  template<bool FORWARD, class DataHandle>
513  void communicateVariableSize(DataHandle& handle);
520  std::size_t maxBufferSize_;
528  const InterfaceMap* interface_;
534  MPI_Comm communicator_;
535 };
536 
538 namespace
539 {
543 template<class DataHandle>
544 class SizeDataHandle
545 {
546 public:
547  typedef std::size_t DataType;
548 
549  SizeDataHandle(DataHandle& data,
550  std::vector<InterfaceTracker>& trackers)
551  : data_(data), trackers_(trackers), index_()
552  {}
553  bool fixedsize()
554  {
555  return true;
556  }
557  std::size_t size([[maybe_unused]] std::size_t i)
558  {
559  return 1;
560  }
561  template<class B>
562  void gather(B& buf, int i)
563  {
564  buf.write(data_.size(i));
565  }
566  void setReceivingIndex(std::size_t i)
567  {
568  index_=i;
569  }
570  std::size_t* getSizesPointer()
571  {
572  return trackers_[index_].getSizesPointer();
573  }
574 
575 private:
576  DataHandle& data_;
577  std::vector<InterfaceTracker>& trackers_;
578  int index_;
579 };
580 
581 template<class T>
582 void setReceivingIndex(T&, int)
583 {}
584 
585 template<class T>
586 void setReceivingIndex(SizeDataHandle<T>& t, int i)
587 {
588  t.setReceivingIndex(i);
589 }
590 
591 
597 template<bool FORWARD>
598 struct InterfaceInformationChooser
599 {
603  static const InterfaceInformation&
604  getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
605  {
606  return info.first;
607  }
608 
612  static const InterfaceInformation&
613  getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
614  {
615  return info.second;
616  }
617 };
618 
619 template<>
620 struct InterfaceInformationChooser<false>
621 {
622  static const InterfaceInformation&
623  getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
624  {
625  return info.second;
626  }
627 
628  static const InterfaceInformation&
629  getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
630  {
631  return info.first;
632  }
633 };
634 
640 template<class DataHandle>
641 struct PackEntries
642 {
643 
644  int operator()(DataHandle& handle, InterfaceTracker& tracker,
645  MessageBuffer<typename DataHandle::DataType>& buffer,
646  [[maybe_unused]] int i) const
647  {
648  return operator()(handle,tracker,buffer);
649  }
650 
658  int operator()(DataHandle& handle, InterfaceTracker& tracker,
659  MessageBuffer<typename DataHandle::DataType>& buffer) const
660  {
661  if(tracker.fixedSize) // fixed size if variable is >0!
662  {
663 
664  std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
665  for(std::size_t i=0; i< noIndices; ++i)
666  {
667  handle.gather(buffer, tracker.index());
668  tracker.moveToNextIndex();
669  }
670  return noIndices*tracker.fixedSize;
671  }
672  else
673  {
674  int packed=0;
675  tracker.skipZeroIndices();
676  while(!tracker.finished())
677  if(buffer.hasSpaceForItems(handle.size(tracker.index())))
678  {
679  handle.gather(buffer, tracker.index());
680  packed+=handle.size(tracker.index());
681  tracker.moveToNextIndex();
682  }
683  else
684  break;
685  return packed;
686  }
687  }
688 };
689 
695 template<class DataHandle>
696 struct UnpackEntries{
697 
705  bool operator()(DataHandle& handle, InterfaceTracker& tracker,
706  MessageBuffer<typename DataHandle::DataType>& buffer,
707  int count=0)
708  {
709  if(tracker.fixedSize) // fixed size if variable is >0!
710  {
711  std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
712 
713  for(std::size_t i=0; i< noIndices; ++i)
714  {
715  handle.scatter(buffer, tracker.index(), tracker.fixedSize);
716  tracker.moveToNextIndex();
717  }
718  return tracker.finished();
719  }
720  else
721  {
722  assert(count);
723  for(int unpacked=0;unpacked<count;)
724  {
725  assert(!tracker.finished());
726  assert(buffer.hasSpaceForItems(tracker.size()));
727  handle.scatter(buffer, tracker.index(), tracker.size());
728  unpacked+=tracker.size();
729  tracker.moveToNextIndex();
730  }
731  return tracker.finished();
732  }
733  }
734 };
735 
736 
740 template<class DataHandle>
741 struct UnpackSizeEntries{
742 
750  bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
751  MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer) const
752  {
753  std::size_t noIndices=std::min(buffer.size(), tracker.indicesLeft());
754  std::copy(static_cast<std::size_t*>(buffer), static_cast<std::size_t*>(buffer)+noIndices,
755  handle.getSizesPointer()+tracker.offset());
756  tracker.increment(noIndices);
757  return noIndices;
758  }
759  bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
760  MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer, int) const
761  {
762  return operator()(handle,tracker,buffer);
763  }
764 };
765 
773 void sendFixedSize(std::vector<InterfaceTracker>& send_trackers,
774  std::vector<MPI_Request>& send_requests,
775  std::vector<InterfaceTracker>& recv_trackers,
776  std::vector<MPI_Request>& recv_requests,
777  MPI_Comm communicator)
778 {
779  typedef std::vector<InterfaceTracker>::iterator TIter;
780  std::vector<MPI_Request>::iterator mIter=recv_requests.begin();
781 
782  for(TIter iter=recv_trackers.begin(), end=recv_trackers.end(); iter!=end;
783  ++iter, ++mIter)
784  {
785  MPI_Irecv(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
786  iter->rank(), 933881, communicator, &(*mIter));
787  }
788 
789  // Send our size to all neighbours using non-blocking synchronous communication.
790  std::vector<MPI_Request>::iterator mIter1=send_requests.begin();
791  for(TIter iter=send_trackers.begin(), end=send_trackers.end();
792  iter!=end;
793  ++iter, ++mIter1)
794  {
795  MPI_Issend(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
796  iter->rank(), 933881, communicator, &(*mIter1));
797  }
798 }
799 
800 
805 template<class DataHandle>
806 struct SetupSendRequest{
807  void operator()(DataHandle& handle,
808  InterfaceTracker& tracker,
809  MessageBuffer<typename DataHandle::DataType>& buffer,
810  MPI_Request& request,
811  MPI_Comm comm) const
812  {
813  buffer.reset();
814  int size=PackEntries<DataHandle>()(handle, tracker, buffer);
815  // Skip indices of zero size.
816  while(!tracker.finished() && !handle.size(tracker.index()))
817  tracker.moveToNextIndex();
818  if(size)
819  MPI_Issend(buffer, size, MPITraits<typename DataHandle::DataType>::getType(),
820  tracker.rank(), 933399, comm, &request);
821  }
822 };
823 
824 
829 template<class DataHandle>
830 struct SetupRecvRequest{
831  void operator()(DataHandle& /*handle*/,
832  InterfaceTracker& tracker,
833  MessageBuffer<typename DataHandle::DataType>& buffer,
834  MPI_Request& request,
835  MPI_Comm comm) const
836  {
837  buffer.reset();
838  if(tracker.indicesLeft())
839  MPI_Irecv(buffer, buffer.size(), MPITraits<typename DataHandle::DataType>::getType(),
840  tracker.rank(), 933399, comm, &request);
841  }
842 };
843 
847 template<class DataHandle>
848 struct NullPackUnpackFunctor
849 {
850  int operator()(DataHandle&, InterfaceTracker&,
851  MessageBuffer<typename DataHandle::DataType>&, int)
852  {
853  return 0;
854  }
855  int operator()(DataHandle&, InterfaceTracker&,
856  MessageBuffer<typename DataHandle::DataType>&)
857  {
858  return 0;
859  }
860 };
861 
876 template<class DataHandle, class BufferFunctor, class CommunicationFunctor>
877 std::size_t checkAndContinue(DataHandle& handle,
878  std::vector<InterfaceTracker>& trackers,
879  std::vector<MPI_Request>& requests,
880  std::vector<MPI_Request>& requests2,
881  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
882  MPI_Comm comm,
883  BufferFunctor buffer_func,
884  CommunicationFunctor comm_func,
885  bool valid=true,
886  bool getCount=false)
887 {
888  std::size_t size=requests.size();
889  std::vector<MPI_Status> statuses(size);
890  int no_completed;
891  std::vector<int> indices(size, -1); // the indices for which the communication finished.
892 
893  MPI_Testsome(size, &(requests[0]), &no_completed, &(indices[0]), &(statuses[0]));
894  indices.resize(no_completed);
895  for(std::vector<int>::iterator index=indices.begin(), end=indices.end();
896  index!=end; ++index)
897  {
898  InterfaceTracker& tracker=trackers[*index];
899  setReceivingIndex(handle, *index);
900  if(getCount)
901  {
902  // Get the number of entries received
903  int count;
904  MPI_Get_count(&(statuses[index-indices.begin()]),
906  &count);
907  // Communication completed, we can reuse the buffers, e.g. unpack or repack
908  buffer_func(handle, tracker, buffers[*index], count);
909  }else
910  buffer_func(handle, tracker, buffers[*index]);
911  tracker.skipZeroIndices();
912  if(!tracker.finished()){
913  // Maybe start another communication.
914  comm_func(handle, tracker, buffers[*index], requests2[*index], comm);
915  tracker.skipZeroIndices();
916  if(valid)
917  --no_completed; // communication not finished, decrement counter for finished ones.
918  }
919  }
920  return no_completed;
921 
922 }
923 
933 template<class DataHandle>
934 std::size_t receiveSizeAndSetupReceive(DataHandle& handle,
935  std::vector<InterfaceTracker>& trackers,
936  std::vector<MPI_Request>& size_requests,
937  std::vector<MPI_Request>& data_requests,
938  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
939  MPI_Comm comm)
940 {
941  return checkAndContinue(handle, trackers, size_requests, data_requests, buffers, comm,
942  NullPackUnpackFunctor<DataHandle>(), SetupRecvRequest<DataHandle>(), false);
943 }
944 
953 template<class DataHandle>
954 std::size_t checkSendAndContinueSending(DataHandle& handle,
955  std::vector<InterfaceTracker>& trackers,
956  std::vector<MPI_Request>& requests,
957  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
958  MPI_Comm comm)
959 {
960  return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
961  NullPackUnpackFunctor<DataHandle>(), SetupSendRequest<DataHandle>());
962 }
963 
972 template<class DataHandle>
973 std::size_t checkReceiveAndContinueReceiving(DataHandle& handle,
974  std::vector<InterfaceTracker>& trackers,
975  std::vector<MPI_Request>& requests,
976  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
977  MPI_Comm comm)
978 {
979  return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
980  UnpackEntries<DataHandle>(), SetupRecvRequest<DataHandle>(),
981  true, !handle.fixedsize());
982 }
983 
984 
985 bool validRecvRequests(const std::vector<MPI_Request> reqs)
986 {
987  for(std::vector<MPI_Request>::const_iterator i=reqs.begin(), end=reqs.end();
988  i!=end; ++i)
989  if(*i!=MPI_REQUEST_NULL)
990  return true;
991  return false;
992 }
993 
1004 template<class DataHandle, class Functor>
1005 std::size_t setupRequests(DataHandle& handle,
1006  std::vector<InterfaceTracker>& trackers,
1007  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
1008  std::vector<MPI_Request>& requests,
1009  const Functor& setupFunctor,
1010  MPI_Comm communicator)
1011 {
1012  typedef typename std::vector<InterfaceTracker>::iterator TIter;
1013  typename std::vector<MessageBuffer<typename DataHandle::DataType> >::iterator
1014  biter=buffers.begin();
1015  typename std::vector<MPI_Request>::iterator riter=requests.begin();
1016  std::size_t complete=0;
1017  for(TIter titer=trackers.begin(), end=trackers.end(); titer!=end; ++titer, ++biter, ++riter)
1018  {
1019  setupFunctor(handle, *titer, *biter, *riter, communicator);
1020  complete+=titer->finished();
1021  }
1022  return complete;
1023 }
1024 } // end unnamed namespace
1025 
1026 template<class Allocator>
1027 template<bool FORWARD, class DataHandle>
1028 void VariableSizeCommunicator<Allocator>::setupInterfaceTrackers(DataHandle& handle,
1029  std::vector<InterfaceTracker>& send_trackers,
1030  std::vector<InterfaceTracker>& recv_trackers)
1031 {
1032  if(interface_->size()==0)
1033  return;
1034  send_trackers.reserve(interface_->size());
1035  recv_trackers.reserve(interface_->size());
1036 
1037  int fixedsize=0;
1038  if(handle.fixedsize())
1039  ++fixedsize;
1040 
1041 
1042  typedef typename InterfaceMap::const_iterator IIter;
1043  for(IIter inf=interface_->begin(), end=interface_->end(); inf!=end; ++inf)
1044  {
1045 
1046  if(handle.fixedsize() && InterfaceInformationChooser<FORWARD>::getSend(inf->second).size())
1047  fixedsize=handle.size(InterfaceInformationChooser<FORWARD>::getSend(inf->second)[0]);
1048  assert(!handle.fixedsize()||fixedsize>0);
1049  send_trackers.push_back(InterfaceTracker(inf->first,
1050  InterfaceInformationChooser<FORWARD>::getSend(inf->second), fixedsize));
1051  recv_trackers.push_back(InterfaceTracker(inf->first,
1052  InterfaceInformationChooser<FORWARD>::getReceive(inf->second), fixedsize, fixedsize==0));
1053  }
1054 }
1055 
1056 template<class Allocator>
1057 template<bool FORWARD, class DataHandle>
1058 void VariableSizeCommunicator<Allocator>::communicateFixedSize(DataHandle& handle)
1059 {
1060  std::vector<MPI_Request> size_send_req(interface_->size());
1061  std::vector<MPI_Request> size_recv_req(interface_->size());
1062 
1063  std::vector<InterfaceTracker> send_trackers;
1064  std::vector<InterfaceTracker> recv_trackers;
1065  setupInterfaceTrackers<FORWARD>(handle,send_trackers, recv_trackers);
1066  sendFixedSize(send_trackers, size_send_req, recv_trackers, size_recv_req, communicator_);
1067 
1068  std::vector<MPI_Request> data_send_req(interface_->size(), MPI_REQUEST_NULL);
1069  std::vector<MPI_Request> data_recv_req(interface_->size(), MPI_REQUEST_NULL);
1070  typedef typename DataHandle::DataType DataType;
1071  std::vector<MessageBuffer<DataType> > send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1072  recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1073 
1074 
1075  setupRequests(handle, send_trackers, send_buffers, data_send_req,
1076  SetupSendRequest<DataHandle>(), communicator_);
1077 
1078  std::size_t no_size_to_recv, no_to_send, no_to_recv, old_size;
1079  no_size_to_recv = no_to_send = no_to_recv = old_size = interface_->size();
1080 
1081  // Skip empty interfaces.
1082  typedef typename std::vector<InterfaceTracker>::const_iterator Iter;
1083  for(Iter i=recv_trackers.begin(), end=recv_trackers.end(); i!=end; ++i)
1084  if(i->empty())
1085  --no_to_recv;
1086  for(Iter i=send_trackers.begin(), end=send_trackers.end(); i!=end; ++i)
1087  if(i->empty())
1088  --no_to_send;
1089 
1090  while(no_size_to_recv+no_to_send+no_to_recv)
1091  {
1092  // Receive the fixedsize and setup receives accordingly
1093  if(no_size_to_recv)
1094  no_size_to_recv -= receiveSizeAndSetupReceive(handle,recv_trackers, size_recv_req,
1095  data_recv_req, recv_buffers,
1096  communicator_);
1097 
1098  // Check send completion and initiate other necessary sends
1099  if(no_to_send)
1100  no_to_send -= checkSendAndContinueSending(handle, send_trackers, data_send_req,
1101  send_buffers, communicator_);
1102  if(validRecvRequests(data_recv_req))
1103  // Receive data and setup new unblocking receives if necessary
1104  no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, data_recv_req,
1105  recv_buffers, communicator_);
1106  }
1107 
1108  // Wait for completion of sending the size.
1109  //std::vector<MPI_Status> statuses(interface_->size(), MPI_STATUSES_IGNORE);
1110  MPI_Waitall(size_send_req.size(), &(size_send_req[0]), MPI_STATUSES_IGNORE);
1111 
1112 }
1113 
1114 template<class Allocator>
1115 template<bool FORWARD, class DataHandle>
1116 void VariableSizeCommunicator<Allocator>::communicateSizes(DataHandle& handle,
1117  std::vector<InterfaceTracker>& data_recv_trackers)
1118 {
1119  std::vector<InterfaceTracker> send_trackers;
1120  std::vector<InterfaceTracker> recv_trackers;
1121  std::size_t size = interface_->size();
1122  std::vector<MPI_Request> send_requests(size, MPI_REQUEST_NULL);
1123  std::vector<MPI_Request> recv_requests(size, MPI_REQUEST_NULL);
1124  std::vector<MessageBuffer<std::size_t> >
1125  send_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_)),
1126  recv_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_));
1127  SizeDataHandle<DataHandle> size_handle(handle,data_recv_trackers);
1128  setupInterfaceTrackers<FORWARD>(size_handle,send_trackers, recv_trackers);
1129  setupRequests(size_handle, send_trackers, send_buffers, send_requests,
1130  SetupSendRequest<SizeDataHandle<DataHandle> >(), communicator_);
1131  setupRequests(size_handle, recv_trackers, recv_buffers, recv_requests,
1132  SetupRecvRequest<SizeDataHandle<DataHandle> >(), communicator_);
1133 
1134  // Count valid requests that we have to wait for.
1135  auto valid_req_func =
1136  [](const MPI_Request& req) { return req != MPI_REQUEST_NULL; };
1137 
1138  auto size_to_send = std::count_if(send_requests.begin(), send_requests.end(),
1139  valid_req_func);
1140  auto size_to_recv = std::count_if(recv_requests.begin(), recv_requests.end(),
1141  valid_req_func);
1142 
1143  while(size_to_send+size_to_recv)
1144  {
1145  if(size_to_send)
1146  size_to_send -=
1147  checkSendAndContinueSending(size_handle, send_trackers, send_requests,
1148  send_buffers, communicator_);
1149  if(size_to_recv)
1150  // Could have done this using checkSendAndContinueSending
1151  // But the call below is more efficient as UnpackSizeEntries
1152  // uses std::copy.
1153  size_to_recv -=
1154  checkAndContinue(size_handle, recv_trackers, recv_requests, recv_requests,
1155  recv_buffers, communicator_, UnpackSizeEntries<DataHandle>(),
1156  SetupRecvRequest<SizeDataHandle<DataHandle> >());
1157  }
1158 }
1159 
1160 template<class Allocator>
1161 template<bool FORWARD, class DataHandle>
1162 void VariableSizeCommunicator<Allocator>::communicateVariableSize(DataHandle& handle)
1163 {
1164 
1165  std::vector<InterfaceTracker> send_trackers;
1166  std::vector<InterfaceTracker> recv_trackers;
1167  setupInterfaceTrackers<FORWARD>(handle, send_trackers, recv_trackers);
1168 
1169  std::vector<MPI_Request> send_requests(interface_->size(), MPI_REQUEST_NULL);
1170  std::vector<MPI_Request> recv_requests(interface_->size(), MPI_REQUEST_NULL);
1171  typedef typename DataHandle::DataType DataType;
1172  std::vector<MessageBuffer<DataType> >
1173  send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1174  recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1175 
1176  communicateSizes<FORWARD>(handle, recv_trackers);
1177  // Setup requests for sending and receiving.
1178  setupRequests(handle, send_trackers, send_buffers, send_requests,
1179  SetupSendRequest<DataHandle>(), communicator_);
1180  setupRequests(handle, recv_trackers, recv_buffers, recv_requests,
1181  SetupRecvRequest<DataHandle>(), communicator_);
1182 
1183  // Determine number of valid requests.
1184  auto valid_req_func =
1185  [](const MPI_Request& req) { return req != MPI_REQUEST_NULL;};
1186 
1187  auto no_to_send = std::count_if(send_requests.begin(), send_requests.end(),
1188  valid_req_func);
1189  auto no_to_recv = std::count_if(recv_requests.begin(), recv_requests.end(),
1190  valid_req_func);
1191  while(no_to_send+no_to_recv)
1192  {
1193  // Check send completion and initiate other necessary sends
1194  if(no_to_send)
1195  no_to_send -= checkSendAndContinueSending(handle, send_trackers, send_requests,
1196  send_buffers, communicator_);
1197  if(no_to_recv)
1198  // Receive data and setup new unblocking receives if necessary
1199  no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, recv_requests,
1200  recv_buffers, communicator_);
1201  }
1202 }
1203 
1204 template<class Allocator>
1205 template<bool FORWARD, class DataHandle>
1206 void VariableSizeCommunicator<Allocator>::communicate(DataHandle& handle)
1207 {
1208  if( interface_->size() == 0)
1209  // Simply return as otherwise we will index an empty container
1210  // either for MPI_Wait_all or MPI_Test_some.
1211  return;
1212 
1213  if(handle.fixedsize())
1214  communicateFixedSize<FORWARD>(handle);
1215  else
1216  communicateVariableSize<FORWARD>(handle);
1217 }
1218 } // end namespace Dune
1219 
1220 #endif // HAVE_MPI
1221 
1222 #endif
Traits classes for mapping types onto MPI_Datatype.
std::size_t fixedSize
The number of data items per index if it is fixed, 0 otherwise.
Definition: variablesizecommunicator.hh:244
MPI_Comm communicator() const
Get the MPI Communicator.
Definition: parallel/interface.hh:415
Dune namespace.
Definition: alignedallocator.hh:11
auto min(const AlignedNumber< T, align > &a, const AlignedNumber< T, align > &b)
Definition: debugalign.hh:434
static MPI_Datatype getType()
Definition: mpitraits.hh:46
size_t size() const
Get the number of entries in the interface.
Definition: parallel/interface.hh:106
Communication interface between remote and local indices.
Definition: parallel/interface.hh:207
A buffered communicator where the amount of data sent does not have to be known a priori.
Definition: variablesizecommunicator.hh:311
VariableSizeCommunicator(const Interface &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:383
void backward(DataHandle &handle)
Communicate backwards.
Definition: variablesizecommunicator.hh:465
~VariableSizeCommunicator()
Definition: variablesizecommunicator.hh:389
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:372
VariableSizeCommunicator(const VariableSizeCommunicator &other)
Copy-constructs a communicator.
Definition: variablesizecommunicator.hh:398
void forward(DataHandle &handle)
Communicate forward.
Definition: variablesizecommunicator.hh:440
VariableSizeCommunicator & operator=(const VariableSizeCommunicator &other)
Copy-assignes a communicator.
Definition: variablesizecommunicator.hh:408
std::map< int, std::pair< InterfaceInformation, InterfaceInformation >, std::less< int >, typename std::allocator_traits< Allocator >::template rebind_alloc< std::pair< const int, std::pair< InterfaceInformation, InterfaceInformation > > > > InterfaceMap
The type of the map from process number to InterfaceInformation for sending and receiving to and from...
Definition: variablesizecommunicator.hh:319
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:328
VariableSizeCommunicator(const Interface &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:337
Provides classes for building the communication interface between remote indices.