YPC  0.2.0
blockfile.h
1 #pragma once
2 #include "ypc/core/exceptions.h"
3 #include "ypc/core/memref.h"
4 #include <cassert>
5 #include <cstdint>
6 #include <fstream>
7 #include <string>
8 #include <vector>
9 #ifdef HPDA_DEBUG
10 #include <glog/logging.h>
11 #endif
12 
13 namespace ypc {
14 
15 class invalid_blockfile : public std::exception {
16 public:
17  virtual const char *what() { return "wrong magic number"; }
18 };
19 // This is only for append and read, you cannot change any existed data
20 template <uint64_t MagicNumber_t, uint64_t BlockNumLimit_t,
21  uint64_t BlockSizeLimit_t>
22 class blockfile {
23 public:
24  const static uint64_t MagicNumber = MagicNumber_t;
25  const static uint64_t BlockSizeLimit = BlockSizeLimit_t;
26  const static uint64_t BlockNumLimit = BlockNumLimit_t;
27 
28  blockfile()
29  : m_file(), m_file_path(), m_header(), m_is_header_valid(false),
30  m_is_block_info_valid(false), m_block_infos() {}
31 
32  blockfile(const blockfile &) = delete;
33  blockfile(blockfile &&) = delete;
34  blockfile &operator=(const blockfile &) = delete;
35  blockfile &operator=(blockfile &&) = delete;
36  virtual ~blockfile() = default;
37 
38  void open_for_read(const char *file_path) {
39  if (m_file.is_open()) {
40  throw std::runtime_error("already open");
41  }
42  m_file_path = std::string(file_path);
43  m_file.open(file_path, std::ios::in | std::ios::binary);
44  if (!m_file.is_open()) {
45  throw file_open_failure(m_file_path, "blockfile::open_for_read");
46  }
47 
48  reset_read_item();
49  }
50  void open_for_write(const char *file_path) {
51  if (m_file.is_open()) {
52  throw std::runtime_error("already open");
53  }
54  m_file_path = std::string(file_path);
55  m_file.open(file_path, std::ios::out | std::ios::binary);
56  if (!m_file.is_open()) {
57  throw file_open_failure(m_file_path, "blockfile::open_for_write");
58  }
59  }
60 
61  template <typename ByteType>
62  int append_item(const ByteType *data, size_t len) {
63  static_assert(sizeof(ByteType) == 1);
64  return append_item((const char *)data, len);
65  }
66 
67  int append_item(const char *data, size_t len) {
68  // TODO: Check if reach limit
69  read_header();
70  read_all_block_info();
71  m_file.clear();
72  block_info bi{};
73 
74  m_header.item_number++;
75  m_header.magic_number = MagicNumber;
76 
77  if (m_block_infos.empty()) {
78  bi.start_item_index = 0;
79  bi.end_item_index = 1;
80  bi.start_file_pos = block_start_offset;
81  bi.end_file_pos = bi.start_file_pos + len + sizeof(len);
82  m_block_infos.push_back(bi);
83  m_header.block_number++;
84  } else {
85  bi = m_block_infos.back();
86  if (bi.end_item_index - bi.start_item_index >= BlockSizeLimit) {
87  auto back = m_block_infos.back();
88  bi.start_item_index = back.end_item_index;
89  bi.end_item_index = bi.start_item_index++;
90  bi.start_file_pos = back.end_file_pos;
91  bi.end_file_pos = bi.end_file_pos + len + sizeof(len);
92  m_block_infos.push_back(bi);
93  m_header.block_number++;
94  } else {
95  block_info &back = m_block_infos.back();
96  back.end_item_index++;
97  back.end_file_pos = back.end_file_pos + len + sizeof(len);
98  }
99  }
100  block_info &back = m_block_infos.back();
101  auto offset =
102  sizeof(header) + (m_block_infos.size() - 1) * sizeof(block_info);
103  m_file.seekp(offset, m_file.beg);
104  m_file.write((char *)&back, sizeof(back));
105  m_file.seekp(0, m_file.beg);
106  m_file.write((char *)&m_header, sizeof(m_header));
107 
108  m_file.seekp(back.end_file_pos - len - sizeof(len), m_file.beg);
109  m_file.write((char *)&len, sizeof(len));
110  m_file.write(data, len);
111  return 0;
112  }
113 
114  void reset_read_item() {
115  m_file.clear();
116  read_header();
117  read_all_block_info();
118  m_file.seekg(block_start_offset, m_file.beg);
119  }
120 
121  bool next_item(memref &s) {
122  if (m_file.eof()) {
123  return false;
124  }
125  size_t len;
126  m_file.read((char *)&len, sizeof(len));
127  if (m_file.eof()) {
128  return false;
129  }
130  if (s.data() == nullptr) {
131  s.alloc(len);
132  }
133  if (s.size() < len) {
134  s.dealloc();
135  s.alloc(len);
136  }
137 
138  m_file.read((char *)s.data(), len);
139  s.size() = len;
140  return true;
141  }
142 
143  uint64_t item_number() {
144  if (!m_is_header_valid) {
145  read_header();
146  }
147  return m_header.item_number;
148  }
149 
150  void close() {
151  m_is_header_valid = false;
152  m_is_block_info_valid = false;
153  m_block_infos.clear();
154  m_file.close();
155  }
156 
157 protected:
158  void read_header() {
159  if (m_is_header_valid) {
160  return;
161  }
162  auto prev = m_file.tellg();
163 
164  m_file.seekg(0, m_file.beg);
165  m_file.read((char *)&m_header, sizeof(header));
166  if (!m_file.eof() && m_header.magic_number != MagicNumber) {
167  throw invalid_blockfile();
168  }
169  m_is_header_valid = true;
170  m_file.seekg(prev, m_file.beg);
171  }
172  void read_all_block_info() {
173  read_header();
174  if (m_is_block_info_valid) {
175  return;
176  }
177  m_file.seekg(sizeof(header), m_file.beg);
178  for (size_t i = 0; i < m_header.block_number; ++i) {
179  block_info bi{};
180  m_file.read((char *)&bi, sizeof(bi));
181  m_block_infos.push_back(bi);
182  }
183  m_is_block_info_valid = true;
184  }
185 
186 protected:
187  struct header {
188  uint64_t magic_number;
189  uint64_t version_number;
190  uint64_t block_number;
191  uint64_t item_number;
192  };
193  struct block_info {
194  //[start_item_index, end_item_index)
195  block_info() = default;
196  uint64_t start_item_index;
197  uint64_t end_item_index; // not included
198  long int start_file_pos;
199  long int end_file_pos;
200  };
201  const static long int block_start_offset =
202  sizeof(header) + sizeof(block_info) * BlockNumLimit;
203 
204  std::fstream m_file;
205  std::string m_file_path;
206  header m_header;
207  bool m_is_header_valid;
208  bool m_is_block_info_valid;
209  std::vector<block_info> m_block_infos;
210 };
211 } // namespace ypc
ypc::blockfile::block_info
Definition: blockfile.h:193
ypc::memref
Definition: memref.h:6
ypc::invalid_blockfile
Definition: blockfile.h:15
ypc::blockfile
Definition: blockfile.h:22
ypc::file_open_failure
Definition: exceptions.h:43
ypc::blockfile::header
Definition: blockfile.h:187