00001
00002 #ifndef SENDS_TEMPLATE_STAGE_HH
00003 #define SENDS_TEMPLATE_STAGE_HH
00004
00005 #include "seda_stage.hh"
00006 #include <vector>
00007 #include <gmutex.hh>
00008
00009 namespace sends {
00010
00011
00025 class concentrator : public seda_stage {
00026 public:
00036 concentrator(int nThread, int nTrans);
00037
00040 virtual ~concentrator(void);
00041
00046 void add_transaction(transaction* trans, double timeout);
00047
00053 void filter(transaction* trans);
00054
00059 void init_thread(void);
00060
00066 const char* stage_name(void) const;
00067
00068 private:
00069 void lock(void);
00070 void unlock(void);
00071 bool try_collect(void);
00072 void release_collect(void);
00073
00074 private:
00075 struct hold {
00076 hold(transaction* t, const timespec& t);
00077 timespec timeout;
00078 transaction* trans;
00079 int socket;
00080 bool collected;
00081 };
00082
00083 typedef std::vector<hold> hold_vect;
00084 typedef hold_vect::iterator hold_iter;
00085
00086 private:
00087 double mTimeout;
00088
00089 thread::mutex gate;
00090 volatile bool collector;
00091 hold_vect trans_vect;
00092 };
00093 }
00094
00095 #endif // !defined(SENDS_CONCENTRATOR_HH)