Swarm-NG  1.1
binary_reader.cpp
1 #include "binary_reader.hpp"
2 
3 #include <exception>
4 
5 using namespace swarm;
6 using namespace std;
7 using gpulog::logrecord;
8 
9 const int MAX_RECORD_SIZE = 4096;
10 const int BUFFER_SIZE = 128*1024*1024;
11 const int PAGESIZE = 4096;
12 
17 binary_reader::binary_reader(istream& input)
18  :_input(input)
19 {
20  buffer_end = current = buffer_begin = new char[BUFFER_SIZE];
21 }
22 
23 ptrdiff_t binary_reader::tellg(){
24  return _input.tellg() - (buffer_end - current);
25 }
26 
27 void binary_reader::seek(ptrdiff_t absolute_position){
28  ptrdiff_t page_offset = ((absolute_position + PAGESIZE-1)/PAGESIZE)*PAGESIZE;
29  ptrdiff_t mode_offset = absolute_position - page_offset;
30 
31  _input.seekg( page_offset, ios_base::beg );
32 
33  readChunk( mode_offset );
34 }
35 
36 void binary_reader::readChunk(ptrdiff_t current_offset){
37  // read the chuck
38  _input.read(buffer_begin,BUFFER_SIZE);
39  buffer_end = buffer_begin + _input.gcount();
40 
41  // set the current pointer
42  current = buffer_begin + current_offset;
43 
44  // std::cerr << "Read " << (buffer_end-buffer_begin) <<
45  // " bytes and current at " << (current-buffer_begin) << std::endl;
46 }
47 
48 void binary_reader::readNextChunk(){
49  // lseek back to before of the current
50  ptrdiff_t back_offset = buffer_end - current;
51  ptrdiff_t page_offset = ((back_offset + PAGESIZE-1)/PAGESIZE)*PAGESIZE;
52  if(page_offset > 0) _input.seekg( -page_offset , ios_base::cur);
53 
54  readChunk(page_offset - back_offset);
55 }
56 
57 
58 bool binary_reader::ensure(const size_t& len){
59  if(current + len < buffer_end){
60  return true;
61  } else if(!_input.eof()){
62  readNextChunk();
63  return true;
64  } else {
65  return false;
66  }
67 }
68 
69 char* binary_reader::readBytes(const size_t& len){
70  if(ensure(len)){
71  char* ret = current;
72  current += len;
73  return ret;
74  } else {
75  return 0;
76  }
77 }
78 
80  swarm_header defh(query::UNSORTED_HEADER_FULL);
81 
82  readNextChunk();
83 
84  swarm_header* curh = reinterpret_cast<swarm_header*>(readBytes(sizeof(swarm_header)));
85 
86  return _input.good() && curh && defh.is_compatible(*curh);
87 }
88 
89 
90 
91 logrecord binary_reader::next() {
92  const static gpulog::internal::header hend(-1,0);
93  const static int LOH = sizeof(int)*2;
94 
95  if(!ensure(LOH)) return logrecord((char*)&hend);
96  logrecord l(current);
97 
98  //std::cerr << "Len: " << l.len() << std::endl;
99  if(l.len() > MAX_RECORD_SIZE) {
100  std::cerr << "Trying to read very large error, probably wrong data: " << l.len() << std::endl;
101  throw std::runtime_error("Record is too large");
102  }
103 
104  // We have to re-read the logrecord since the current might move
105  logrecord ll(readBytes(l.len()));
106 
107  return ll;
108 }
109