00001 /* -*- mode: c++; c-basic-offset: 4; -*- */ 00002 #ifndef SENDS_TRANSACTION_HH 00003 #define SENDS_TRANSACTION_HH 00004 00005 //#include "tbb/concurrent_queue.h" 00006 #include "mt_queue.hh" 00007 #include "socket_api.hh" 00008 #include "user_data.hh" 00009 #include "chan_list.hh" 00010 #include "segdb.hh" 00011 #include <vector> 00012 #include <iosfwd> 00013 00014 namespace sends { 00015 00016 class transaction; 00017 class buffer; 00018 //typedef tbb::concurrent_queue<transaction*> tran_queue; 00019 typedef thread::mt_queue<transaction*> tran_queue; 00020 00030 class transaction { 00031 public: 00032 00036 typedef chan_list::chan_index chan_index; 00037 00041 typedef unsigned long gps_type; 00042 00048 class chan_stat { 00049 chan_index _chinx; 00050 gps_type _fillpt; 00051 public: 00056 chan_stat(chan_index chn); 00057 00062 chan_stat(void); 00063 00070 gps_type fill(void) const; 00071 00076 chan_index index(void) const; 00077 00084 bool ready(gps_type stop) const; 00085 00090 void set_fill(gps_type t); 00091 }; 00092 00095 typedef std::vector<chan_stat> cinx_vect; 00096 00101 typedef seg_db::seg_index seg_index; 00102 00106 typedef std::vector<seg_index> seg_vect; 00107 00111 enum command_code { 00112 kNoop, 00113 kGetStatus, 00114 kGetData, 00115 kGetServerStatus, 00116 kGetChannels, 00117 kQuit, 00118 kCommandCount 00119 }; 00120 00121 public: 00127 transaction(tran_queue& retQ); 00128 00131 ~transaction(void); 00132 00137 long block_seq(void); 00138 00143 void add_channel(chan_index inx); 00144 00147 void clear(void); 00148 00152 void clear_channels(void); 00153 00157 command_code command(void) const; 00158 00165 std::ostream& dump(std::ostream& out) const; 00166 00172 gps_type getChanFill(int i) const; 00173 00180 chan_index getChanIndex(int i) const; 00181 00188 bool getChanReady(int i) const; 00189 00195 static command_code getCommandCode(const std::string& str); 00196 00202 static const char* getCommandString(command_code code); 00203 00208 int getNChan(void) const; 00209 00214 long get_stage(void) const; 00215 00220 gps_type getStartGPS(void) const; 00221 00226 gps_type getStopGPS(void) const; 00227 00232 gps_type getStrideEnd(void) const; 00233 00238 long ident(void) const; 00239 00247 bool next_stride(void); 00248 00255 int put_response(int resp); 00256 00262 socket_api& ref_socket(void); 00263 00269 user_data& ref_user(void); 00270 00276 int read(buffer& buf); 00277 00282 void error_reply(const std::string& err); 00283 00288 void reply(const buffer& reply); 00289 00294 void release(void); 00295 00299 void release_segments(void); 00300 00306 void release_segments(gps_type before); 00307 00312 void reserve_segments(void); 00313 00318 void setCommand(command_code cmd); 00319 00325 void setCommand(const std::string& cmd); 00326 00331 void set_id(long tranid); 00332 00337 void set_socket(socket_api* sock); 00338 00343 void set_stage(long s); 00344 00349 void set_user(const user_data& user); 00350 00360 void setTime(gps_type start, gps_type stop, gps_type stride=0); 00361 00366 gps_type stride(void) const; 00367 private: 00368 long trans_id; 00369 tran_queue* home; 00370 socket_api* trans_sock; 00371 user_data trans_user; 00372 command_code trans_code; 00373 gps_type trans_start; 00374 gps_type trans_stop; 00375 gps_type trans_stride; 00376 cinx_vect trans_channels; 00377 seg_vect trans_segments; 00378 long trans_blockid; 00379 long stage; 00380 }; 00381 00382 00383 //================================== transaction::chan_stat inline methods 00384 inline 00385 transaction::chan_stat::chan_stat(chan_index chn) 00386 : _chinx(chn), _fillpt(0) 00387 {} 00388 00389 inline 00390 transaction::chan_stat::chan_stat(void) { 00391 } 00392 00393 inline transaction::gps_type 00394 transaction::chan_stat::fill(void) const { 00395 return _fillpt; 00396 } 00397 00398 inline transaction::chan_index 00399 transaction::chan_stat::index(void) const { 00400 return _chinx; 00401 } 00402 00403 inline bool 00404 transaction::chan_stat::ready(gps_type t) const { 00405 return _fillpt >= t; 00406 } 00407 00408 inline void 00409 transaction::chan_stat::set_fill(gps_type t) { 00410 _fillpt = t; 00411 } 00412 00413 //================================== transction inline methods 00414 inline long 00415 transaction::block_seq(void) { 00416 return trans_blockid++; 00417 } 00418 00419 inline transaction::command_code 00420 transaction::command(void) const { 00421 return trans_code; 00422 } 00423 00424 inline transaction::gps_type 00425 transaction::getChanFill(int i) const { 00426 gps_type t = trans_channels[i].fill(); 00427 if (t < trans_start) t = trans_start; 00428 return t; 00429 } 00430 00431 inline transaction::chan_index 00432 transaction::getChanIndex(int i) const { 00433 return trans_channels[i].index(); 00434 } 00435 00436 inline bool 00437 transaction::getChanReady(int i) const { 00438 return trans_channels[i].ready(trans_start + trans_stride); 00439 } 00440 00441 inline int 00442 transaction::getNChan(void) const { 00443 return trans_channels.size(); 00444 } 00445 00446 inline long 00447 transaction::get_stage(void) const { 00448 return stage; 00449 } 00450 00451 inline transaction::gps_type 00452 transaction::getStartGPS(void) const { 00453 return trans_start; 00454 } 00455 00456 inline transaction::gps_type 00457 transaction::getStopGPS(void) const { 00458 return trans_stop; 00459 } 00460 00461 inline transaction::gps_type 00462 transaction::getStrideEnd(void) const { 00463 return trans_start + trans_stride; 00464 } 00465 00466 inline long 00467 transaction::ident(void) const { 00468 return trans_id; 00469 } 00470 00471 inline socket_api& 00472 transaction::ref_socket(void) { 00473 return *trans_sock; 00474 } 00475 00476 inline user_data& 00477 transaction::ref_user(void) { 00478 return trans_user; 00479 } 00480 inline transaction::gps_type 00481 transaction::stride(void) const { 00482 return trans_stride; 00483 } 00484 00485 } 00486 00487 #endif // !defined(SENDS_TRANSACTION_HH)
1.5.5