Swarm-NG  1.1
log2db.cpp
Go to the documentation of this file.
1 /*************************************************************************
2  * Copyright (C) 2012 by Saleh Dindar and the Swarm-NG Development Team *
3  * *
4  * This program is free software; you can redistribute it and/or modify *
5  * it under the terms of the GNU General Public License as published by *
6  * the Free Software Foundation; either version 3 of the License. *
7  * *
8  * This program is distributed in the hope that it will be useful, *
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
11  * GNU General Public License for more details. *
12  * *
13  * You should have received a copy of the GNU General Public License *
14  * along with this program; if not, write to the *
15  * Free Software Foundation, Inc., *
16  * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
17  ************************************************************************/
18 
26 #include <boost/program_options.hpp>
27 #include <boost/program_options/positional_options.hpp>
28 
29 #include <limits>
30 #include <iostream>
31 #include <signal.h>
32 #include <db_cxx.h>
33 
34 #include "swarm/swarm.h"
35 #include "swarm/query.hpp"
36 #include "binary_reader.hpp"
37 
38 using namespace swarm;
39 using namespace std;
40 
41 namespace po = boost::program_options;
42 using boost::bind;
43 using boost::shared_ptr;
44 using gpulog::logrecord;
45 
46 int recordsLimit = 1000;
47 int DEBUG_LEVEL = 0;
48 const int CACHESIZE = 1024*1024*64 ;
49 
50 typedef long long idx_t;
51 
52 struct logdb_primary_key {
53  double time;
54  int system_id;
55  int event_id;
56  idx_t recno;
57 };
58 
59 
60 po::variables_map argvars_map;
61 string inputFileName, outputFileName;
62 // Number to start with record number, it shouldn't be
63 // very significant since it is only used to remove
64 // duplicates
65 idx_t start_recno = 0;
66 // Absolute position in the input file where the conversion should
67 // begin
68 idx_t starting_position = 0;
69 
70 void parse_commandline_and_config(int argc, char* argv[]){
71 
72  po::options_description desc("Usage:\n \tlog2db [options]\nOptions");
73  desc.add_options()
74  ("input,i", po::value<std::string>(), "Binary log file from swarm binary_writer")
75  ("output,o", po::value<std::string>(), "Name of the database output file")
76  ("number,n", po::value<int>(), "Number of records to convert")
77  ("verbose,v", po::value<int>(), "Verbosity level")
78  ("position,p", po::value<idx_t>(), "Absolute position in the input file where the conversion should begin (in bytes)")
79  ("recno,r", po::value<idx_t>(), "Starting record number")
80  ("dump,d", "Dump all the records up to the number")
81  ("quiet,q", "Suppress all messages")
82  ("help,h", "Help message")
83  ;
84 
85 
86 
87  po::variables_map &vm = argvars_map;
88  po::store(po::command_line_parser(argc, argv).
89  options(desc).run(), vm);
90  po::notify(vm);
91 
93  //
94  if (vm.count("help")) { std::cout << desc << "\n"; exit(1); }
95  if (vm.count("verbose") ) DEBUG_LEVEL = vm["verbose"].as<int>();
96  if (vm.count("quiet") ) DEBUG_LEVEL = -1;
97 
98  if(vm.count("input"))
99  inputFileName = vm["input"].as<string>();
100  else{
101  cerr << "Name of input file is required" << endl;
102  exit(2);
103  }
104 
105  if(vm.count("output"))
106  outputFileName = vm["output"].as<string>();
107  else{
108  cerr << "Name of output file is required" << endl;
109  exit(2);
110  }
111 
112  if(vm.count("number"))
113  recordsLimit = vm["number"].as<int>();
114 
115  if(vm.count("recno"))
116  start_recno = vm["recno"].as<idx_t>();
117 
118  if(vm.count("position"))
119  starting_position = vm["position"].as<idx_t>();
120 
121 
122 }
123 
124 
131 template<typename T>
132 void put_in_dbt(const T& t, Dbt* data){
133  data->set_flags(DB_DBT_APPMALLOC);
134  data->set_size(sizeof(T));
135  data->set_data(new T(t));
136 }
137 
138 
139 bool extract_from_ptr(void* ptr, size_t size, double& time, int& sys){
140  logrecord l((char*)ptr);
141  assert(l.len() == size);
142 
143  // Based on the implementation in query.cpp
144  switch(l.msgid()){
145  case 1: case 2: case 11: case 15: case 16:
146  l >> time >> sys;
147  return true;
148 
149  default:
150  sys = -1;
151  time = numeric_limits<double>::quiet_NaN();
152  return false;
153  }
154 
155 }
156 
166 int lr_extract_sysid(Db *secondary, const Dbt *key, const Dbt *data, Dbt *result) {
167  logdb_primary_key& pkey = *(logdb_primary_key*) key->get_data();
168  put_in_dbt(pkey.system_id, result);
169  return 0;
170 }
171 
172 int lr_extract_evtid(Db *secondary, const Dbt *key, const Dbt *data, Dbt *result) {
173  logdb_primary_key& pkey = *(logdb_primary_key*) key->get_data();
174  put_in_dbt(pkey.event_id, result);
175  return 0;
176 }
177 
178 int lr_extract_time(Db *secondary, const Dbt *key, const Dbt *data, Dbt *result) {
179  logdb_primary_key& pkey = *(logdb_primary_key*) key->get_data();
180  put_in_dbt(pkey.time , result);
181  return 0;
182 }
183 
190 int compare_logdb_primary_key(DB* db, const DBT *k1, const DBT* k2){
191  const size_t len = sizeof(logdb_primary_key);
192  if(k1->size < k2->size)
193  return -1;
194  else if(k1->size > k2->size)
195  return 1;
196  else{
197  if( (k1->size == len) && (k2->size == len) ) {
198 
199  logdb_primary_key& a = *(logdb_primary_key*)(k1->data);
200  logdb_primary_key& b = *(logdb_primary_key*)(k2->data);
201 
202  if(a.time < b.time) return -1;
203  else if(a.time > b.time) return 1;
204  else if(a.system_id < b.system_id) return -1;
205  else if(a.system_id > b.system_id) return 1;
206  else if(a.event_id < b.event_id) return -1;
207  else if(a.event_id > b.event_id) return 1;
208  else if(a.recno < b.recno) return -1;
209  else if(a.recno > b.recno) return 1;
210  else return 0;
211  }else{
212  return 0;
213  }
214  }
215 }
216 int compare_time(DB* db, const DBT *k1, const DBT* k2){
217  if(k1->size < k2->size)
218  return -1;
219  else if(k1->size > k2->size)
220  return 1;
221  else{
222  if( (k1->size == 8) && (k2->size == 8) ) {
223  double& a = *(double*)(k1->data);
224  double& b = *(double*)(k2->data);
225  if(a < b) return -1;
226  else if(a > b) return 1;
227  else return 0;
228  }else{
229  return 0;
230  }
231  }
232 }
233 
234 
261 using gpulog::logrecord;
263 
264 
265 volatile bool interruption_received = false;
266 
267 
268 
273 void sigTERM_handler(int signum) {
274  printf("Received signal %d. interrupting execution\n", signum);
275  if(interruption_received)
276  exit(2);
277  else
278  interruption_received = true;
279 }
280 
282 int main(int argc, char* argv[]){
283  parse_commandline_and_config(argc,argv);
284 
285  signal(SIGTERM, sigTERM_handler);
286  signal(SIGINT, sigTERM_handler);
287 
288  ifstream input;
289  Db primary(NULL, 0), system_idx(NULL, 0), time_idx(NULL, 0), event_idx(NULL, 0);
290  binary_reader input_reader(input);
291  // Open the binary file, we should be able to
292  input.open(inputFileName.c_str(), ios::binary);
293  if(!input_reader.validate())
294  throw std::runtime_error("The input file is not valid");
295 
296 
297 
298  cout << "Starting to convert at position " << starting_position <<
299  " with record number " << start_recno << endl;
300 
301  idx_t current_recno = start_recno;
302 
303  // When starting_position is not specified, we shouldn't
304  // go back to zero, it is incorrect as the header is already
305  // read.
306  if(starting_position > 0)
307  input_reader.seek( starting_position );
308 
309  // Open the database files: primary, system_index, time_index, event_index
310  // associate the indices with the primary
311 
312 /* DbEnv dbenv(0);
313  dbenv.set_cachesize(0, 5 * 1024 * 1024 , 0);
314  dbenv.set_data_dir(".");
315  dbenv.open(".", DB_CREATE | DB_INIT_LOG |
316  DB_INIT_LOCK | DB_INIT_MPOOL |
317  DB_INIT_TXN , 0);*/
318  primary.set_cachesize(0,CACHESIZE,0);
319  primary.set_bt_compare(compare_logdb_primary_key);
320  primary.open(NULL, (outputFileName+".p.db").c_str(), NULL, DB_BTREE, DB_CREATE, 0);
321 
322  // Open up the system index database, it has to support
323  // duplicates and it is given a smaller cache size
324  system_idx.set_cachesize(0,CACHESIZE/4,0);
325  system_idx.set_flags(DB_DUP | DB_DUPSORT);
326  system_idx.open(NULL, (outputFileName+".sys.db").c_str(), NULL, DB_BTREE, DB_CREATE , 0);
327 
328  // Open up the time index database, it has to support
329  // duplicates because our index is not a unique index and
330  // it takes a smaller cache size
331  time_idx.set_cachesize(0,CACHESIZE/4,0);
332  time_idx.set_flags(DB_DUP | DB_DUPSORT);
333  time_idx.set_bt_compare(compare_time);
334  time_idx.open(NULL, (outputFileName+".time.db").c_str(), NULL, DB_BTREE, DB_CREATE , 0);
335 
336  event_idx.set_cachesize(0,CACHESIZE/4,0);
337  event_idx.set_flags(DB_DUP | DB_DUPSORT);
338  event_idx.open(NULL, (outputFileName+".evt.db").c_str(), NULL, DB_BTREE, DB_CREATE , 0);
339 
340  // Associate the primary table with the indices
341  // the lr_extract_* is the function that defines
342  // the indexing scheme
343  primary.associate(NULL, &system_idx, &lr_extract_sysid, DB_IMMUTABLE_KEY);
344  primary.associate(NULL, &time_idx , &lr_extract_time , DB_IMMUTABLE_KEY);
345  primary.associate(NULL, &event_idx , &lr_extract_evtid, DB_IMMUTABLE_KEY);
346 
347 
348  logdb_primary_key pkey;
349  Dbt key(&pkey,sizeof(pkey)),data;
350 
351 
352  for(int i=0; (i < recordsLimit) && !interruption_received; i++) {
353  // read one record from binary file
354  logrecord l = input_reader.next();
355 
356  /* if(argvars_map.count("dump") > 0) {
357  output_record(std::cout, l);
358  std::cout << std::endl;
359  }*/
360 
361  // insert it into the primary database, the secondary indices are automatically populated.
362  if(l){
363 
364  // form the primary key
365  pkey.event_id = l.msgid();
366  extract_from_ptr((void*)l.ptr, l.len(), pkey.time, pkey.system_id);
367  pkey.recno = current_recno;
368 
369  data.set_data((void*)l.ptr);
370  data.set_size(l.len());
371  primary.put(NULL,&key,&data,0);
372 
373  current_recno += 1;
374  }else{
375  break;
376  }
377 
378  }
379  cout << "Processed the input file up to position " << input_reader.tellg() <<
380  " and record number " << current_recno << endl;
381 
382  // Close all the databases
383  primary.close(0);
384  system_idx.close(0);
385  time_idx.close(0);
386  event_idx.close(0);
387  //dbenv.close(0);
388 
389  // Close the binary file
390  input.close();
391 
392  if(interruption_received)
393  exit(1);
394  else
395  exit(0);
396 
397 }
398 
399 
400 
401