3 #ifndef DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
4 #define DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
44 template<
class T,
class Allocator=std::allocator<T> >
52 explicit MessageBuffer(
int size)
53 : buffer_(new T[size]), size_(size), position_(0)
59 explicit MessageBuffer(
const MessageBuffer& o)
60 : buffer_(new T[o.size_]), size_(o.size_), position_(o.position_)
72 void write(
const T& data)
74 buffer_[position_++]=data;
83 data=buffer_[position_++];
102 return position_==size_;
110 bool hasSpaceForItems(
int noItems)
112 return position_+noItems<=size_;
118 std::size_t size()
const
143 std::size_t position_;
149 class InterfaceTracker
157 InterfaceTracker(
int rank, InterfaceInformation info, std::size_t fixedsize=0,
158 bool allocateSizes=
false)
159 :
fixedSize(fixedsize),rank_(rank), index_(), interface_(info), sizes_()
163 sizes_.resize(info.size());
170 void moveToNextIndex()
173 assert(index_<=interface_.size());
180 void increment(std::size_t i)
183 assert(index_<=interface_.size());
189 bool finished()
const
191 return index_==interface_.size();
194 void skipZeroIndices()
197 while(sizes_.size() && index_!=interface_.size() &&!size())
205 std::size_t index()
const
207 return interface_[index_];
212 std::size_t size()
const
214 assert(sizes_.size());
215 return sizes_[index_];
220 std::size_t* getSizesPointer()
230 return !interface_.size();
237 std::size_t indicesLeft()
const
239 return interface_.size()-index_;
255 std::size_t offset()
const
265 InterfaceInformation interface_;
266 std::vector<std::size_t> sizes_;
309 template<
class Allocator=std::allocator<std::pair<InterfaceInformation,InterfaceInformation> > >
317 typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation>,
319 typename std::allocator_traits<Allocator>::template rebind_alloc< std::pair<const int,std::pair<InterfaceInformation,InterfaceInformation> > > >
InterfaceMap;
321 #ifndef DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE
329 : maxBufferSize_(32768), interface_(&inf)
331 MPI_Comm_dup(comm, &communicator_);
338 : maxBufferSize_(32768), interface_(&inf.interfaces())
350 : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
353 MPI_Comm_dup(comm, &communicator_);
360 : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
361 interface_(&inf.interfaces())
363 MPI_Comm_dup(inf.communicator(), &communicator_);
373 : maxBufferSize_(max_buffer_size), interface_(&inf)
375 MPI_Comm_dup(comm, &communicator_);
384 : maxBufferSize_(max_buffer_size), interface_(&inf.interfaces())
391 MPI_Comm_free(&communicator_);
399 maxBufferSize_ = other.maxBufferSize_;
400 interface_ = other.interface_;
401 MPI_Comm_dup(other.communicator_, &communicator_);
412 maxBufferSize_ = other.maxBufferSize_;
413 interface_ = other.interface_;
414 MPI_Comm_free(&communicator_);
415 MPI_Comm_dup(other.communicator_, &communicator_);
439 template<
class DataHandle>
442 communicate<true>(handle);
464 template<
class DataHandle>
467 communicate<false>(handle);
471 template<
bool FORWARD,
class DataHandle>
472 void communicateSizes(DataHandle& handle,
473 std::vector<InterfaceTracker>& recv_trackers);
481 template<
bool forward,
class DataHandle>
482 void communicate(DataHandle& handle);
492 template<
bool FORWARD,
class DataHandle>
493 void setupInterfaceTrackers(DataHandle& handle,
494 std::vector<InterfaceTracker>& send_trackers,
495 std::vector<InterfaceTracker>& recv_trackers);
503 template<
bool FORWARD,
class DataHandle>
504 void communicateFixedSize(DataHandle& handle);
512 template<
bool FORWARD,
class DataHandle>
513 void communicateVariableSize(DataHandle& handle);
520 std::size_t maxBufferSize_;
534 MPI_Comm communicator_;
543 template<
class DataHandle>
547 typedef std::size_t DataType;
549 SizeDataHandle(DataHandle& data,
550 std::vector<InterfaceTracker>& trackers)
551 : data_(data), trackers_(trackers), index_()
557 std::size_t size([[maybe_unused]] std::size_t i)
562 void gather(B& buf,
int i)
564 buf.write(data_.size(i));
566 void setReceivingIndex(std::size_t i)
570 std::size_t* getSizesPointer()
572 return trackers_[index_].getSizesPointer();
577 std::vector<InterfaceTracker>& trackers_;
582 void setReceivingIndex(T&,
int)
586 void setReceivingIndex(SizeDataHandle<T>& t,
int i)
588 t.setReceivingIndex(i);
597 template<
bool FORWARD>
598 struct InterfaceInformationChooser
603 static const InterfaceInformation&
604 getSend(
const std::pair<InterfaceInformation,InterfaceInformation>& info)
612 static const InterfaceInformation&
613 getReceive(
const std::pair<InterfaceInformation,InterfaceInformation>& info)
620 struct InterfaceInformationChooser<false>
622 static const InterfaceInformation&
623 getSend(
const std::pair<InterfaceInformation,InterfaceInformation>& info)
628 static const InterfaceInformation&
629 getReceive(
const std::pair<InterfaceInformation,InterfaceInformation>& info)
640 template<
class DataHandle>
644 int operator()(DataHandle& handle, InterfaceTracker& tracker,
645 MessageBuffer<typename DataHandle::DataType>& buffer,
646 [[maybe_unused]]
int i)
const
648 return operator()(handle,tracker,buffer);
658 int operator()(DataHandle& handle, InterfaceTracker& tracker,
659 MessageBuffer<typename DataHandle::DataType>& buffer)
const
661 if(tracker.fixedSize)
664 std::size_t noIndices=
std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
665 for(std::size_t i=0; i< noIndices; ++i)
667 handle.gather(buffer, tracker.index());
668 tracker.moveToNextIndex();
670 return noIndices*tracker.fixedSize;
675 tracker.skipZeroIndices();
676 while(!tracker.finished())
677 if(buffer.hasSpaceForItems(handle.size(tracker.index())))
679 handle.gather(buffer, tracker.index());
680 packed+=handle.size(tracker.index());
681 tracker.moveToNextIndex();
695 template<
class DataHandle>
696 struct UnpackEntries{
705 bool operator()(DataHandle& handle, InterfaceTracker& tracker,
706 MessageBuffer<typename DataHandle::DataType>& buffer,
709 if(tracker.fixedSize)
711 std::size_t noIndices=
std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
713 for(std::size_t i=0; i< noIndices; ++i)
715 handle.scatter(buffer, tracker.index(), tracker.fixedSize);
716 tracker.moveToNextIndex();
718 return tracker.finished();
723 for(
int unpacked=0;unpacked<count;)
725 assert(!tracker.finished());
726 assert(buffer.hasSpaceForItems(tracker.size()));
727 handle.scatter(buffer, tracker.index(), tracker.size());
728 unpacked+=tracker.size();
729 tracker.moveToNextIndex();
731 return tracker.finished();
740 template<
class DataHandle>
741 struct UnpackSizeEntries{
750 bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
751 MessageBuffer<
typename SizeDataHandle<DataHandle>::DataType>& buffer)
const
753 std::size_t noIndices=
std::min(buffer.size(), tracker.indicesLeft());
754 std::copy(
static_cast<std::size_t*
>(buffer),
static_cast<std::size_t*
>(buffer)+noIndices,
755 handle.getSizesPointer()+tracker.offset());
756 tracker.increment(noIndices);
759 bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
760 MessageBuffer<
typename SizeDataHandle<DataHandle>::DataType>& buffer,
int)
const
762 return operator()(handle,tracker,buffer);
773 void sendFixedSize(std::vector<InterfaceTracker>& send_trackers,
774 std::vector<MPI_Request>& send_requests,
775 std::vector<InterfaceTracker>& recv_trackers,
776 std::vector<MPI_Request>& recv_requests,
777 MPI_Comm communicator)
779 typedef std::vector<InterfaceTracker>::iterator TIter;
780 std::vector<MPI_Request>::iterator mIter=recv_requests.begin();
782 for(TIter iter=recv_trackers.begin(), end=recv_trackers.end(); iter!=end;
786 iter->rank(), 933881, communicator, &(*mIter));
790 std::vector<MPI_Request>::iterator mIter1=send_requests.begin();
791 for(TIter iter=send_trackers.begin(), end=send_trackers.end();
796 iter->rank(), 933881, communicator, &(*mIter1));
805 template<
class DataHandle>
806 struct SetupSendRequest{
807 void operator()(DataHandle& handle,
808 InterfaceTracker& tracker,
809 MessageBuffer<typename DataHandle::DataType>& buffer,
810 MPI_Request& request,
814 int size=PackEntries<DataHandle>()(handle, tracker, buffer);
816 while(!tracker.finished() && !handle.size(tracker.index()))
817 tracker.moveToNextIndex();
820 tracker.rank(), 933399, comm, &request);
829 template<
class DataHandle>
830 struct SetupRecvRequest{
831 void operator()(DataHandle& ,
832 InterfaceTracker& tracker,
833 MessageBuffer<typename DataHandle::DataType>& buffer,
834 MPI_Request& request,
838 if(tracker.indicesLeft())
840 tracker.rank(), 933399, comm, &request);
847 template<
class DataHandle>
848 struct NullPackUnpackFunctor
850 int operator()(DataHandle&, InterfaceTracker&,
851 MessageBuffer<typename DataHandle::DataType>&,
int)
855 int operator()(DataHandle&, InterfaceTracker&,
856 MessageBuffer<typename DataHandle::DataType>&)
876 template<
class DataHandle,
class BufferFunctor,
class CommunicationFunctor>
877 std::size_t checkAndContinue(DataHandle& handle,
878 std::vector<InterfaceTracker>& trackers,
879 std::vector<MPI_Request>& requests,
880 std::vector<MPI_Request>& requests2,
881 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
883 BufferFunctor buffer_func,
884 CommunicationFunctor comm_func,
888 std::size_t size=requests.size();
889 std::vector<MPI_Status> statuses(size);
891 std::vector<int> indices(size, -1);
893 MPI_Testsome(size, &(requests[0]), &no_completed, &(indices[0]), &(statuses[0]));
894 indices.resize(no_completed);
895 for(std::vector<int>::iterator index=indices.begin(), end=indices.end();
898 InterfaceTracker& tracker=trackers[*index];
899 setReceivingIndex(handle, *index);
904 MPI_Get_count(&(statuses[index-indices.begin()]),
908 buffer_func(handle, tracker, buffers[*index], count);
910 buffer_func(handle, tracker, buffers[*index]);
911 tracker.skipZeroIndices();
912 if(!tracker.finished()){
914 comm_func(handle, tracker, buffers[*index], requests2[*index], comm);
915 tracker.skipZeroIndices();
933 template<
class DataHandle>
934 std::size_t receiveSizeAndSetupReceive(DataHandle& handle,
935 std::vector<InterfaceTracker>& trackers,
936 std::vector<MPI_Request>& size_requests,
937 std::vector<MPI_Request>& data_requests,
938 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
941 return checkAndContinue(handle, trackers, size_requests, data_requests, buffers, comm,
942 NullPackUnpackFunctor<DataHandle>(), SetupRecvRequest<DataHandle>(),
false);
953 template<
class DataHandle>
954 std::size_t checkSendAndContinueSending(DataHandle& handle,
955 std::vector<InterfaceTracker>& trackers,
956 std::vector<MPI_Request>& requests,
957 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
960 return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
961 NullPackUnpackFunctor<DataHandle>(), SetupSendRequest<DataHandle>());
972 template<
class DataHandle>
973 std::size_t checkReceiveAndContinueReceiving(DataHandle& handle,
974 std::vector<InterfaceTracker>& trackers,
975 std::vector<MPI_Request>& requests,
976 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
979 return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
980 UnpackEntries<DataHandle>(), SetupRecvRequest<DataHandle>(),
981 true, !handle.fixedsize());
985 bool validRecvRequests(
const std::vector<MPI_Request> reqs)
987 for(std::vector<MPI_Request>::const_iterator i=reqs.begin(), end=reqs.end();
989 if(*i!=MPI_REQUEST_NULL)
1004 template<
class DataHandle,
class Functor>
1005 std::size_t setupRequests(DataHandle& handle,
1006 std::vector<InterfaceTracker>& trackers,
1007 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
1008 std::vector<MPI_Request>& requests,
1009 const Functor& setupFunctor,
1010 MPI_Comm communicator)
1012 typedef typename std::vector<InterfaceTracker>::iterator TIter;
1013 typename std::vector<MessageBuffer<typename DataHandle::DataType> >::iterator
1014 biter=buffers.begin();
1015 typename std::vector<MPI_Request>::iterator riter=requests.begin();
1016 std::size_t complete=0;
1017 for(TIter titer=trackers.begin(), end=trackers.end(); titer!=end; ++titer, ++biter, ++riter)
1019 setupFunctor(handle, *titer, *biter, *riter, communicator);
1020 complete+=titer->finished();
1026 template<
class Allocator>
1027 template<
bool FORWARD,
class DataHandle>
1028 void VariableSizeCommunicator<Allocator>::setupInterfaceTrackers(DataHandle& handle,
1029 std::vector<InterfaceTracker>& send_trackers,
1030 std::vector<InterfaceTracker>& recv_trackers)
1032 if(interface_->
size()==0)
1034 send_trackers.reserve(interface_->
size());
1035 recv_trackers.reserve(interface_->
size());
1038 if(handle.fixedsize())
1042 typedef typename InterfaceMap::const_iterator IIter;
1043 for(IIter inf=interface_->begin(), end=interface_->end(); inf!=end; ++inf)
1046 if(handle.fixedsize() && InterfaceInformationChooser<FORWARD>::getSend(inf->second).size())
1047 fixedsize=handle.size(InterfaceInformationChooser<FORWARD>::getSend(inf->second)[0]);
1048 assert(!handle.fixedsize()||fixedsize>0);
1049 send_trackers.push_back(InterfaceTracker(inf->first,
1050 InterfaceInformationChooser<FORWARD>::getSend(inf->second), fixedsize));
1051 recv_trackers.push_back(InterfaceTracker(inf->first,
1052 InterfaceInformationChooser<FORWARD>::getReceive(inf->second), fixedsize, fixedsize==0));
1056 template<
class Allocator>
1057 template<
bool FORWARD,
class DataHandle>
1058 void VariableSizeCommunicator<Allocator>::communicateFixedSize(DataHandle& handle)
1060 std::vector<MPI_Request> size_send_req(interface_->
size());
1061 std::vector<MPI_Request> size_recv_req(interface_->
size());
1063 std::vector<InterfaceTracker> send_trackers;
1064 std::vector<InterfaceTracker> recv_trackers;
1065 setupInterfaceTrackers<FORWARD>(handle,send_trackers, recv_trackers);
1066 sendFixedSize(send_trackers, size_send_req, recv_trackers, size_recv_req, communicator_);
1068 std::vector<MPI_Request> data_send_req(interface_->
size(), MPI_REQUEST_NULL);
1069 std::vector<MPI_Request> data_recv_req(interface_->
size(), MPI_REQUEST_NULL);
1070 typedef typename DataHandle::DataType DataType;
1071 std::vector<MessageBuffer<DataType> > send_buffers(interface_->
size(), MessageBuffer<DataType>(maxBufferSize_)),
1072 recv_buffers(interface_->
size(), MessageBuffer<DataType>(maxBufferSize_));
1075 setupRequests(handle, send_trackers, send_buffers, data_send_req,
1076 SetupSendRequest<DataHandle>(), communicator_);
1078 std::size_t no_size_to_recv, no_to_send, no_to_recv, old_size;
1079 no_size_to_recv = no_to_send = no_to_recv = old_size = interface_->
size();
1082 typedef typename std::vector<InterfaceTracker>::const_iterator Iter;
1083 for(Iter i=recv_trackers.begin(), end=recv_trackers.end(); i!=end; ++i)
1086 for(Iter i=send_trackers.begin(), end=send_trackers.end(); i!=end; ++i)
1090 while(no_size_to_recv+no_to_send+no_to_recv)
1094 no_size_to_recv -= receiveSizeAndSetupReceive(handle,recv_trackers, size_recv_req,
1095 data_recv_req, recv_buffers,
1100 no_to_send -= checkSendAndContinueSending(handle, send_trackers, data_send_req,
1101 send_buffers, communicator_);
1102 if(validRecvRequests(data_recv_req))
1104 no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, data_recv_req,
1105 recv_buffers, communicator_);
1110 MPI_Waitall(size_send_req.size(), &(size_send_req[0]), MPI_STATUSES_IGNORE);
1114 template<
class Allocator>
1115 template<
bool FORWARD,
class DataHandle>
1116 void VariableSizeCommunicator<Allocator>::communicateSizes(DataHandle& handle,
1117 std::vector<InterfaceTracker>& data_recv_trackers)
1119 std::vector<InterfaceTracker> send_trackers;
1120 std::vector<InterfaceTracker> recv_trackers;
1121 std::size_t size = interface_->
size();
1122 std::vector<MPI_Request> send_requests(size, MPI_REQUEST_NULL);
1123 std::vector<MPI_Request> recv_requests(size, MPI_REQUEST_NULL);
1124 std::vector<MessageBuffer<std::size_t> >
1125 send_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_)),
1126 recv_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_));
1127 SizeDataHandle<DataHandle> size_handle(handle,data_recv_trackers);
1128 setupInterfaceTrackers<FORWARD>(size_handle,send_trackers, recv_trackers);
1129 setupRequests(size_handle, send_trackers, send_buffers, send_requests,
1130 SetupSendRequest<SizeDataHandle<DataHandle> >(), communicator_);
1131 setupRequests(size_handle, recv_trackers, recv_buffers, recv_requests,
1132 SetupRecvRequest<SizeDataHandle<DataHandle> >(), communicator_);
1135 auto valid_req_func =
1136 [](
const MPI_Request& req) {
return req != MPI_REQUEST_NULL; };
1138 auto size_to_send = std::count_if(send_requests.begin(), send_requests.end(),
1140 auto size_to_recv = std::count_if(recv_requests.begin(), recv_requests.end(),
1143 while(size_to_send+size_to_recv)
1147 checkSendAndContinueSending(size_handle, send_trackers, send_requests,
1148 send_buffers, communicator_);
1154 checkAndContinue(size_handle, recv_trackers, recv_requests, recv_requests,
1155 recv_buffers, communicator_, UnpackSizeEntries<DataHandle>(),
1156 SetupRecvRequest<SizeDataHandle<DataHandle> >());
1160 template<
class Allocator>
1161 template<
bool FORWARD,
class DataHandle>
1162 void VariableSizeCommunicator<Allocator>::communicateVariableSize(DataHandle& handle)
1165 std::vector<InterfaceTracker> send_trackers;
1166 std::vector<InterfaceTracker> recv_trackers;
1167 setupInterfaceTrackers<FORWARD>(handle, send_trackers, recv_trackers);
1169 std::vector<MPI_Request> send_requests(interface_->
size(), MPI_REQUEST_NULL);
1170 std::vector<MPI_Request> recv_requests(interface_->
size(), MPI_REQUEST_NULL);
1171 typedef typename DataHandle::DataType DataType;
1172 std::vector<MessageBuffer<DataType> >
1173 send_buffers(interface_->
size(), MessageBuffer<DataType>(maxBufferSize_)),
1174 recv_buffers(interface_->
size(), MessageBuffer<DataType>(maxBufferSize_));
1176 communicateSizes<FORWARD>(handle, recv_trackers);
1178 setupRequests(handle, send_trackers, send_buffers, send_requests,
1179 SetupSendRequest<DataHandle>(), communicator_);
1180 setupRequests(handle, recv_trackers, recv_buffers, recv_requests,
1181 SetupRecvRequest<DataHandle>(), communicator_);
1184 auto valid_req_func =
1185 [](
const MPI_Request& req) {
return req != MPI_REQUEST_NULL;};
1187 auto no_to_send = std::count_if(send_requests.begin(), send_requests.end(),
1189 auto no_to_recv = std::count_if(recv_requests.begin(), recv_requests.end(),
1191 while(no_to_send+no_to_recv)
1195 no_to_send -= checkSendAndContinueSending(handle, send_trackers, send_requests,
1196 send_buffers, communicator_);
1199 no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, recv_requests,
1200 recv_buffers, communicator_);
1204 template<
class Allocator>
1205 template<
bool FORWARD,
class DataHandle>
1206 void VariableSizeCommunicator<Allocator>::communicate(DataHandle& handle)
1208 if( interface_->
size() == 0)
1213 if(handle.fixedsize())
1214 communicateFixedSize<FORWARD>(handle);
1216 communicateVariableSize<FORWARD>(handle);
Traits classes for mapping types onto MPI_Datatype.
std::size_t fixedSize
The number of data items per index if it is fixed, 0 otherwise.
Definition: variablesizecommunicator.hh:244
MPI_Comm communicator() const
Get the MPI Communicator.
Definition: parallel/interface.hh:415
Dune namespace.
Definition: alignedallocator.hh:11
auto min(const AlignedNumber< T, align > &a, const AlignedNumber< T, align > &b)
Definition: debugalign.hh:434
static MPI_Datatype getType()
Definition: mpitraits.hh:46
size_t size() const
Get the number of entries in the interface.
Definition: parallel/interface.hh:106
Communication interface between remote and local indices.
Definition: parallel/interface.hh:207
A buffered communicator where the amount of data sent does not have to be known a priori.
Definition: variablesizecommunicator.hh:311
VariableSizeCommunicator(const Interface &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:383
void backward(DataHandle &handle)
Communicate backwards.
Definition: variablesizecommunicator.hh:465
~VariableSizeCommunicator()
Definition: variablesizecommunicator.hh:389
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:372
VariableSizeCommunicator(const VariableSizeCommunicator &other)
Copy-constructs a communicator.
Definition: variablesizecommunicator.hh:398
void forward(DataHandle &handle)
Communicate forward.
Definition: variablesizecommunicator.hh:440
VariableSizeCommunicator & operator=(const VariableSizeCommunicator &other)
Copy-assignes a communicator.
Definition: variablesizecommunicator.hh:408
std::map< int, std::pair< InterfaceInformation, InterfaceInformation >, std::less< int >, typename std::allocator_traits< Allocator >::template rebind_alloc< std::pair< const int, std::pair< InterfaceInformation, InterfaceInformation > > > > InterfaceMap
The type of the map from process number to InterfaceInformation for sending and receiving to and from...
Definition: variablesizecommunicator.hh:319
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:328
VariableSizeCommunicator(const Interface &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:337
Provides classes for building the communication interface between remote indices.