Swarm-NG  1.1
io.cpp
Go to the documentation of this file.
1 /*************************************************************************
2  * Copyright (C) 2010 by Mario Juric and the Swarm-NG Development Team *
3  * *
4  * This program is free software; you can redistribute it and/or modify *
5  * it under the terms of the GNU General Public License as published by *
6  * the Free Software Foundation; either version 3 of the License. *
7  * *
8  * This program is distributed in the hope that it will be useful, *
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
11  * GNU General Public License for more details. *
12  * *
13  * You should have received a copy of the GNU General Public License *
14  * along with this program; if not, write to the *
15  * Free Software Foundation, Inc., *
16  * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
17  ************************************************************************/
18 
24 #include "../common.hpp"
25 #include "io.hpp"
26 
27 
28 using namespace swarm::log;
29 
30 namespace swarm { namespace query {
31 
32 
33 range_special ALL;
34 range_MIN MIN;
35 range_MAX MAX;
36 
37 
38 const char* UNSORTED_HEADER_FULL = "unsorted_output // Unsorted output file";
39 const char* UNSORTED_HEADER_CHECK = "unsorted_output";
40 const char* SORTED_HEADER_FULL = "T_sorted_output // Output file sorted by time";
41 const char* SORTED_HEADER_CHECK = "T_sorted_output";
42 const char* T_INDEX_CHECK = "T_sorted_index";
43 const char* SYS_INDEX_CHECK = "sys_sorted_index";
44 
46 void get_Tsys(gpulog::logrecord &lr, double &T, int &sys)
47 {
48  //std::cerr << "msgid=" << lr.msgid() << "\n";
49  if(lr.msgid() < 0)
50  {
51  // system-defined events that have no (T,sys) heading
52  T = -1; sys = -1;
53  }
54  else
55  {
56  lr >> T >> sys;
57  }
58 }
59 
61 struct idx_t
62 {
63  const char *ptr; // pointer to this packet in memory
64  int len; // length of this packet (including header and data)
65 
66  void gethdr(double &T, int &sys) const
67  {
68  gpulog::logrecord lr(ptr);
69  get_Tsys(lr, T, sys);
70  }
71 
72  bool operator< (const idx_t &a) const
73  {
74  double Tt, Ta;
75  int syst, sysa;
76  this->gethdr(Tt, syst);
77  a.gethdr(Ta, sysa);
78 
79  return Tt < Ta || (
80  Tt == Ta && syst < sysa || (
81  syst == sysa && this->ptr < a.ptr
82  )
83  );
84  }
85 };
86 
88 struct index_entry_time_cmp
89 {
90  bool operator()(const swarmdb::index_entry &a, const swarmdb::index_entry &b) const { return a.T < b.T || (a.T == b.T && a.sys < b.sys || (a.sys == b.sys && a.offs < b.offs) ); }
91 };
92 
94 struct index_entry_sys_cmp
95 {
96  bool operator()(const swarmdb::index_entry &a, const swarmdb::index_entry &b) const { return a.sys < b.sys || (a.sys == b.sys && a.T < b.T) || (a.T == b.T && a.offs < b.offs); }
97 };
98 
99 #if 0
100 struct index_entry_bod_cmp
101 {
102  bool operator()(const swarmdb::index_entry &a, const swarmdb::index_entry &b) const { return a.body < b.body || (a.sys == b.sys && a.body == b.body && a.T < b.T ) || (a.T == b.T && a.sys == b.sys && a.offs < b.offs); }
103 };
104 #endif
105 
107 bool get_file_info(uint64_t &timestamp, uint64_t &filesize, const std::string &fn)
108 {
109  struct stat sb;
110  if(stat(fn.c_str(), &sb) == -1)
111  {
112  ERROR(strerror(errno));
113  }
114 #ifdef _WIN32
115  timestamp = sb.st_mtime;
116 #else
117  timestamp = (uint64_t(sb.st_mtim.tv_sec) << 32) + uint64_t(sb.st_mtim.tv_nsec);
118 #endif
119  filesize = sb.st_size;
120 
121  return true;
122 }
123 
125 template<typename Cmp>
126 class index_creator : public index_creator_base
127 {
128 protected:
129  std::string suffix, filetype;
130  std::ofstream out;
131  std::string filename;
132  int nentries;
133 
135 public:
136  index_creator(const std::string &suffix_, const std::string &filetype_) : suffix(suffix_), filetype(filetype_), nentries(0) {}
137 
139  virtual bool start(const std::string &datafile)
140  {
141  filename = datafile + suffix;
142  out.open(filename.c_str());
143  assert(out);
144 
146  uint64_t timestamp, filesize;
147  get_file_info(timestamp, filesize, datafile);
148 
150  swarm::swarm_index_header fh(filetype, timestamp, filesize);
151  out.write((char*)&fh, sizeof(fh));
152  return true;
153  }
155  virtual bool add_entry(uint64_t offs, gpulog::logrecord lr)
156  {
158  ie.offs = offs;
159  get_Tsys(lr, ie.T, ie.sys);
160  out.write((const char *)&ie, sizeof(ie));
161 
162  nentries++;
163  return true;
164  }
165 
167  virtual bool finish()
168  {
169  out.close();
170 
171  // sort by time
172  mmapped_swarm_index_file mm(filename, filetype, MemoryMap::rw);
173  swarmdb::index_entry *begin = (swarmdb::index_entry *)mm.data(), *end = begin + mm.size()/sizeof(swarmdb::index_entry);
174  assert((end - begin) == nentries);
175 
176  std::sort(begin, end, Cmp());
177  #if 1
178  Cmp cmp;
179  for(int i=1; i < nentries; i++)
180  {
181  bool ok = cmp(begin[i-1], begin[i]); // they're less
182  if(!ok) { ok = !cmp(begin[i], begin[i-1]); } // they're equal
183  assert(ok);
184  }
185  #endif
186  return true;
187  }
188 };
189 
191 void swarmdb::index_binary_log_file(std::vector<boost::shared_ptr<index_creator_base> > &ic, const std::string &datafile)
192 {
193  // open datafile
194  gpulog::ilogstream ils(mmdata.data(), mmdata.size());
195  gpulog::logrecord lr;
196 
197  // postprocess (this is where the creator may sort or store the index)
198  for(int i=0; i != ic.size(); i++)
199  {
200  ic[i]->start(datafile);
201  }
202 
203  // stream through the data file
204  while(lr = ils.next())
205  {
206  for(int i=0; i != ic.size(); i++)
207  {
208  ic[i]->add_entry(lr.ptr - mmdata.data(), lr);
209  }
210  }
211 
212  // postprocess (this is where the creator may sort or store the index)
213  for(int i=0; i != ic.size(); i++)
214  {
215  ic[i]->finish();
216  }
217 }
218 
219 /*
220  Implementation note: This implementation will probably barf
221  when log sizes reach a few GB. A better implementation, using merge
222  sort could/should be written.
223 */
224 
226 bool sort_binary_log_file(const std::string &outfn, const std::string &infn)
227 {
228  mmapped_swarm_file mm(infn, UNSORTED_HEADER_CHECK);
229  gpulog::ilogstream ils(mm.data(), mm.size());
230  std::vector<idx_t> idx;
231  gpulog::logrecord lr;
232 
233  // load record information
234  uint64_t datalen = 0;
235  for(int i=0; lr = ils.next(); i++)
236  {
237  idx_t ii;
238  ii.ptr = lr.ptr;
239  ii.len = lr.len();
240  idx.push_back(ii);
241 
242  datalen += ii.len;
243  }
244  assert(datalen == mm.size());
245 
246  // sort
247  std::sort(idx.begin(), idx.end());
248 
249  // output file header
250  std::ofstream out(outfn.c_str());
251  swarm_header fh(SORTED_HEADER_FULL, 0, datalen);
252  out.write((char*)&fh, sizeof(fh));
253 
254  // write out the data
255  for(int i = 0; i != idx.size(); i++)
256  {
257  out.write(idx[i].ptr, idx[i].len);
258  }
259  size_t tp = out.tellp();
260  assert(tp == sizeof(fh) + datalen);
261  out.close();
262 
263  return true;
264 }
265 
267 struct sysinfo
268 {
269  int sys;
270  long flags;
271  const body *bodies;
272 
273  bool operator <(const sysinfo &a) const
274  {
275  return sys < a.sys;
276  }
277 };
278 
280 bool swarmdb::snapshots::next(cpu_ensemble &ens)
281 {
282  // find and load the next snapshot
283  int nbod = -1, sysmax = -1; double Tsnapend;
284  gpulog::logrecord lr;
285  std::set<sysinfo> systems;
286  while(lr = r.next())
287  {
288  // find next EVT_SNAPSHOT
289  if(lr.msgid() != log::EVT_SNAPSHOT) { continue; }
290 
291  // get time & system ID
292  double T; int nbod_tmp; sysinfo si;
293  lr >> T >> si.sys >> si.flags >> nbod_tmp;
294  if(nbod == -1)
295  {
296  Tsnapend = T*(1+Trelerr) + Tabserr;
297  nbod = nbod_tmp;
298  }
299  assert(nbod == nbod_tmp); // all systems must have the same number of planets
300 
301  // reached the end?
302  if(T > Tsnapend)
303  {
304  r.unget();
305  break;
306  }
307 
308  // check if we already have a record of this system
309  if(systems.count(si))
310  {
311  continue;
312  }
313 
314  // load the pointer to bodies
315  lr >> si.bodies;
316  systems.insert(si);
317 
318  sysmax = std::max(si.sys, sysmax);
319  }
320 
321  // return immediately if we've reached the end
322  if(!systems.size()) { return false; }
323 
324  // pack everything to cpu_ensemble structure
325  ens = cpu_ensemble::create(nbod, sysmax+1);
326  // initially, mark everything as inactive
327  for(int sys = 0; sys != ens.nsys(); sys++)
328  {
329  ens.set_inactive(sys);
330  }
331 
332  // populate the systems with data and mark them active
333  for(std::set<sysinfo>::iterator i = systems.begin(); i != systems.end(); i++)
334  {
335  const sysinfo &si = *i;
336  assert(si.sys >= 0 && si.sys < ens.nsys());
337 
338  // per-system data
339  ens.flags(si.sys) = si.flags;
340 
341  // per-body data
342  for(int bod = 0; bod != nbod; bod++)
343  {
344  const body &b = si.bodies[bod];
345  ens.set_body(si.sys, bod, b.mass, b.x, b.y, b.z, b.vx, b.vy, b.vz);
346  ens.time(si.sys) = Tsnapend;
347  }
348  }
349 
350  return true;
351 }
352 
354 swarmdb::snapshots::snapshots(const swarmdb &db_, time_range_t T, double Tabserr_, double Trelerr_)
355  : db(db_), Tabserr(Tabserr_), Trelerr(Trelerr_), r(db.query(ALL, T))
356 {
357 }
358 
360 swarmdb::result::result(const swarmdb &db_, const sys_range_t &sys_, const time_range_t &T_)
361  : db(db_), sys(sys_), T(T_)
362 {
363  using namespace boost;
364 
365  if(sys.first == sys.last || !T)
366  {
367  // find the sys index range
368  swarmdb::index_entry dummy;
369  dummy.sys = sys.first; begin = std::lower_bound(db.idx_sys.begin, db.idx_sys.end, dummy, bind( &index_entry::sys, _1 ) < bind( &index_entry::sys, _2 ));
370  dummy.sys = sys.last; end = std::upper_bound(db.idx_sys.begin, db.idx_sys.end, dummy, bind( &index_entry::sys, _1 ) < bind( &index_entry::sys, _2 ));
371  }
372  else if(T)
373  {
374  // find the first T in the range
375  swarmdb::index_entry dummy;
376  dummy.T = T.first; begin = std::lower_bound(db.idx_time.begin, db.idx_time.end, dummy, bind( &index_entry::T, _1 ) < bind( &index_entry::T, _2 ));
377  dummy.T = T.last; end = std::upper_bound(db.idx_time.begin, db.idx_time.end, dummy, bind( &index_entry::T, _1 ) < bind( &index_entry::T, _2 ));
378  }
379  else
380  {
381  // stream through everything, time first
382  begin = db.idx_time.begin;
383  end = db.idx_time.end;
384  }
385 
386  at = begin;
387  atprev = at;
388 }
389 
390 
391 #if 0
392 swarmdb::result::result(const swarmdb &db_, const sys_range_t &sys_, const body_range_t &body_, const time_range_t &T_) : db(db_), sys(sys_), body(body_), T(T_)
393 {
394  using namespace boost;
395 
396  if(sys.first == sys.last || !T)
397  {
398  // find the sys index range
399  swarmdb::index_entry dummy;
400  dummy.sys = sys.first; begin = std::lower_bound(db.idx_sys.begin, db.idx_sys.end, dummy, bind( &index_entry::sys, _1 ) < bind( &index_entry::sys, _2 ));
401  dummy.sys = sys.last; end = std::upper_bound(db.idx_sys.begin, db.idx_sys.end, dummy, bind( &index_entry::sys, _1 ) < bind( &index_entry::sys, _2 ));
402  }
403  else if(T)
404  {
405  // find the first T in the range
406  swarmdb::index_entry dummy;
407  dummy.T = T.first; begin = std::lower_bound(db.idx_time.begin, db.idx_time.end, dummy, bind( &index_entry::T, _1 ) < bind( &index_entry::T, _2 ));
408  dummy.T = T.last; end = std::upper_bound(db.idx_time.begin, db.idx_time.end, dummy, bind( &index_entry::T, _1 ) < bind( &index_entry::T, _2 ));
409  }
410  else
411  {
412  // stream through everything, time first
413  begin = db.idx_time.begin;
414  end = db.idx_time.end;
415  }
416 
417  at = begin;
418  atprev = at;
419 }
420 #endif
421 
423 gpulog::logrecord swarmdb::result::next()
424 {
425  if(at < end)
426  {
427  atprev = at;
428  }
429 
430  while(at < end)
431  {
432  if(!T.in(at->T)) { at++; continue; }
433  if(!sys.in(at->sys)) { at++; continue; }
434  // if(!body.in(at->body)) { at++; continue; }
435 
436  return gpulog::logrecord(db.mmdata.data() + (at++)->offs);
437  }
438 
439  static gpulog::header hend(-1, 0);
440  static gpulog::logrecord eof((char *)&hend);
441  return eof;
442 }
443 
444 void swarmdb::result::unget()
445 {
446  at = atprev;
447 }
448 
450 swarmdb::swarmdb(const std::string &datafile)
451 {
452  open(datafile);
453 }
454 
456 void swarmdb::open(const std::string &datafile)
457 {
458  if(access(datafile.c_str(), 04) != 0)
459  {
460  ERROR("Cannot open output file '" + datafile + "'");
461  }
462  this->datafile = datafile;
463 
464  // open the datafile
465  mmdata.open(datafile, SORTED_HEADER_CHECK);
466 
467  // open the indexes
468  open_indexes();
469 }
470 
472 void swarmdb::open_indexes(bool force_recreate)
473 {
474  std::vector<boost::shared_ptr<index_creator_base> > ic;
475  std::string fn;
476 
477  // auto-create indices if needed
478  bool tidx_open = !force_recreate && open_index(idx_time, datafile, ".time.idx", "T_sorted_index");
479  if(!tidx_open)
480  {
481  boost::shared_ptr<index_creator_base> ii(new index_creator<index_entry_time_cmp>(".time.idx", "T_sorted_index"));
482  ic.push_back( ii );
483  }
484  bool sysidx_open = !force_recreate && open_index(idx_sys, datafile, ".sys.idx", "sys_sorted_index");
485  if(!sysidx_open)
486  {
487  boost::shared_ptr<index_creator_base> ii(new index_creator<index_entry_sys_cmp>(".sys.idx", "sys_sorted_index"));
488  ic.push_back( ii );
489  }
490 
491  // create indices if needed
492  if(!ic.empty())
493  {
494  index_binary_log_file(ic, datafile);
495  }
496 
497  // open index maps
498  if(!tidx_open && !open_index(idx_time, datafile, ".time.idx", T_INDEX_CHECK ))
499  {
500  ERROR("Cannot open index file '" + datafile + ".time.idx'");
501  }
502  if(!sysidx_open && !open_index(idx_sys, datafile, ".sys.idx", SYS_INDEX_CHECK))
503  {
504  ERROR("Cannot open index file '" + datafile + ".sys.idx'");
505  }
506 }
507 
509 bool swarmdb::open_index(index_handle &h, const std::string &datafile, const std::string &suffix, const std::string &filetype)
510 {
511  std::string filename = datafile + suffix;
512 
513  // check for existence
514  if(access(filename.c_str(), 04) != 0) { return false; }
515 
516 
517 
518  // check for modification timestamp and size
519  h.mm.open(filename.c_str(), filetype);
520  uint64_t timestamp, filesize;
521  get_file_info(timestamp, filesize, datafile);
522 
524  timestamp = h.mm.hdr().timestamp;
525 
526  if(h.mm.hdr().datafile_size != filesize || h.mm.hdr().timestamp != timestamp)
527  {
528  std::cerr << "Index " << filename << " not up to date. Will regenerate.\n";
529  return false;
530  }
531 
532  h.begin = (swarmdb::index_entry *)h.mm.data();
533  h.end = h.begin + h.mm.size()/sizeof(swarmdb::index_entry);
534 
535  return true;
536 }
537 
538 
539 } } // namespace query::swarm