Disk ARchive  2.7.1
Full featured and portable backup and archiving tool
parallel_block_compressor.hpp
Go to the documentation of this file.
1 /*********************************************************************/
2 // dar - disk archive - a backup/restoration program
3 // Copyright (C) 2002-2021 Denis Corbin
4 //
5 // This program is free software; you can redistribute it and/or
6 // modify it under the terms of the GNU General Public License
7 // as published by the Free Software Foundation; either version 2
8 // of the License, or (at your option) any later version.
9 //
10 // This program is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU General Public License
16 // along with this program; if not, write to the Free Software
17 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 //
19 // to contact the author, see the AUTHOR file
20 /*********************************************************************/
21 
25 
98 
99 #ifndef PARALLEL_BLOCK_COMPRESSOR_HPP
100 #define PARALLEL_BLOCK_COMPRESSOR_HPP
101 
102 #include "../my_config.h"
103 
104 #include "infinint.hpp"
105 #include "crypto_segment.hpp"
106 #include "heap.hpp"
107 #include "compress_module.hpp"
108 #include "proto_compressor.hpp"
109 
110 #include <libthreadar/libthreadar.hpp>
111 
112 namespace libdar
113 {
114 
117 
118 
120 
121  enum class compressor_block_flags { data = 0, eof_die = 1, error = 2, worker_error = 3 };
122 
123  // the following classes hold the subthreads of class parallel_block_compressor
124  // and are defined just after it below
125 
126  class zip_below_read;
127  class zip_below_write;
128  class zip_worker;
129 
130 
132  //
133  // paralle_compressor class, which holds the sub-threads
134  //
135  //
136 
137  class parallel_block_compressor: public proto_compressor
138  {
139  public:
145 
146  parallel_block_compressor(U_I num_workers,
147  std::unique_ptr<compress_module> block_zipper,
148  generic_file & compressed_side,
149  U_I uncompressed_bs = default_uncompressed_block_size);
150  // compressed_side is not owned by the object and will remains
151  // after the objet destruction
152 
153  parallel_block_compressor(const parallel_block_compressor & ref) = delete;
154  parallel_block_compressor(parallel_block_compressor && ref) noexcept = delete;
155  parallel_block_compressor & operator = (const parallel_block_compressor & ref) = delete;
156  parallel_block_compressor & operator = (parallel_block_compressor && ref) noexcept = delete;
157  ~parallel_block_compressor();
158 
159  // inherited from proto_compressor
160 
161  virtual compression get_algo() const override { return suspended? compression::none : zipper->get_algo(); };
162  virtual void suspend_compression() override;
163  virtual void resume_compression() override;
164  virtual bool is_compression_suspended() const override { return suspended; };
165 
166  // inherited from generic file
167 
168  virtual bool skippable(skippability direction, const infinint & amount) override;
169  virtual bool skip(const infinint & pos) override;
170  virtual bool skip_to_eof() override;
171  virtual bool skip_relative(S_I x) override;
172  virtual bool truncatable(const infinint & pos) const override;
173  virtual infinint get_position() const override;
174 
175  protected :
176  virtual void inherited_read_ahead(const infinint & amount) override { if(!suspended) run_read_threads(); };
177  virtual U_I inherited_read(char *a, U_I size) override;
178  virtual void inherited_write(const char *a, U_I size) override;
179  virtual void inherited_truncate(const infinint & pos) override;
180  virtual void inherited_sync_write() override;
181  virtual void inherited_flush_read() override { stop_read_threads(); };
182  virtual void inherited_terminate() override;
183 
184  private:
185 
186  // the local fields
187 
188  U_I num_w;
189  std::unique_ptr<compress_module> zipper;
190  generic_file *compressed;
191  U_I uncompressed_block_size;
192  bool suspended;
193  bool running_threads;
194  std::unique_ptr<crypto_segment> curwrite;
195  std::deque<std::unique_ptr<crypto_segment> > lus_data;
196  std::deque<signed int> lus_flags;
197  bool reof;
198 
199 
200  // inter-thread data structure
201 
202  std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > disperse;
203  std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > rassemble;
204  std::shared_ptr<heap<crypto_segment> > tas;
205 
206 
207  // the subthreads
208 
209  std::unique_ptr<zip_below_read> reader;
210  std::unique_ptr<zip_below_write> writer;
211  std::deque<zip_worker> travailleurs;
212 
213 
214  // private methods
215 
216  void send_flag_to_workers(compressor_block_flags flag);
217  void stop_threads();
218  void stop_read_threads();
219  void stop_write_threads();
220  void run_threads();
221  void run_read_threads();
222  void run_write_threads();
223  compressor_block_flags purge_ratelier_up_to_non_data();
224 
225 
226  // static methods
227 
228  static U_I get_ratelier_size(U_I num_workers) { return num_workers + num_workers/2; };
229  static U_I get_heap_size(U_I num_workers);
230  };
231 
232 
233 
235  //
236  // zip_below_write class/sub-thread
237  //
238  //
239 
240  class zip_below_write: public libthreadar::thread
241  {
242  public:
243  zip_below_write(const std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > & source,
244  generic_file *dest,
245  const std::shared_ptr<heap<crypto_segment> > & xtas,
246  U_I num_workers);
247 
248 
252  bool exception_pending() const { return error; };
253 
255  void reset();
256 
257  protected:
258  virtual void inherited_run() override;
259 
260  private:
261  std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > src;
262  generic_file *dst;
263  std::shared_ptr<heap<crypto_segment> > tas;
264  U_I num_w;
265  bool error;
266  U_I ending;
267  std::deque<std::unique_ptr<crypto_segment> > data;
268  std::deque<signed int> flags;
269  libthreadar::mutex get_pos;
270  infinint current_position;
271 
272  void work();
273  void pop_front() { tas->put(std::move(data.front())); data.pop_front(); flags.pop_front(); };
274  };
275 
276 
277 
278 
280  //
281  // zip_below_read class/sub-thread
282  //
283  //
284 
285 
286 
287  class zip_below_read: public libthreadar::thread
288  {
289  public:
290  zip_below_read(generic_file *source,
291  const std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > & dest,
292  const std::shared_ptr<heap<crypto_segment> > & xtas,
293  U_I num_workers);
294 
296  void do_stop() { should_i_stop = true; };
297 
299  void reset();
300 
301  protected:
302  virtual void inherited_run() override;
303 
304  private:
305  generic_file *src;
306  const std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > & dst;
307  const std::shared_ptr<heap<crypto_segment> > & tas;
308  U_I num_w;
309  std::unique_ptr<crypto_segment> ptr;
310  bool should_i_stop;
311 
312 
313  void work();
314  void push_flag_to_all_workers(compressor_block_flags flag);
315  };
316 
317 
318 
319 
321  //
322  // zip_worker class/sub-thread
323  //
324  //
325 
326 
327 
328  class zip_worker: public libthreadar::thread
329  {
330  public:
331  zip_worker(std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > & read_side,
332  std::shared_ptr<libthreadar::ratelier_gather <crypto_segment> > & write_size,
333  std::unique_ptr<compress_module> && ptr,
334  bool compress);
335 
336  protected:
337  virtual void inherited_run() override;
338 
339  private:
340  std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > & reader;
341  std::shared_ptr<libthreadar::ratelier_gather <crypto_segment> > & writer;
342  std::unique_ptr<compress_module> compr;
343  bool do_compress;
344  bool error;
345  std::unique_ptr<crypto_segment> transit;
346  unsigned int transit_slot;
347 
348  void work();
349  };
350 
351 
353 
354 } // end of namespace
355 
356 
357 #endif
358 
generic_file(gf_mode m)
main constructor
provides abstracted interface of per-block compression/decompression
defines unit block of information ciphered as once
compression
the different compression algorithm available
Definition: compression.hpp:46
@ none
no compression
compressor_block_flags
the different flags used to communicate between threads hold by parallel_block_compressor class
heap data structure (relying on FIFO)
switch module to limitint (32 ou 64 bits integers) or infinint
libdar namespace encapsulate all libdar symbols
Definition: archive.hpp:47
abstracted ancestor class for compressor and parallel_compressor classes