Bonjour,
Je suis en train d'implémenter un module de lecture/écriture optimisé permettant à la lecture, le traitement et l'écriture ligne par ligne, de se dérouler en parallèle : des threads de lecture remplissent ligne par ligne chacun une FIFO, qui sont lues par un autre thread de traitement qui remplit avec les résultats une FIFO de sortie, à partir de laquelle le fichier de sortie sera écrit par un autre thread d'écriture.
Pour le test ci-dessous, je n'ai fait qu'une lecture, et le traitement se résume à une simple copie. Je me trouve face aux problèmes suivants :
- Sous Windows et Linux : le programme ne se termine pas 3 fois sur 5 (les chances de réussite augmentent bizarrement avec le nombre d'opérations entre les deux FIFO), et génère de temps à autres (inexcusable quand même) une SIGSEV.
- Sous Linux, j'ai droit à du SIGABRT à la création du thread, et après le thread à "memory clobbered before allocated block" (soit une autre SIGABRT) aléatoirement dans les quinze premières lignes.
Croyez bien que j'ai HORREUR de poser mon problème et de vous inviter à réfléchir mais j'ai épuisé mes ressources... Je m'en excuse donc d'avance, en espérant une réponse qui me sortirait d'un beau pétrin.
Main.cpp :
Io.hpp :
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 #include "io.hpp" #include <iostream> int main(int argc, char *argv[]) { CSV::Reader reader; CSV::Writer writer; if(reader.open("test_grandeur_nature.csv") && writer.open("output.txt")) { CSV::Row row; reader.run(); //Reads the CSV file and fills the read queue writer.run(); //Reads the to-be-written queue and writes it to a txt file //The loop is supposed to end only if the reader is finished and empty while(!(reader.is_finished() && reader.empty())) { //Transfers line by line from the read to the to-be-written queues reader.wait_and_pop(row); writer.push(row); } //The reader will likely finish before the writer, so he has to finish his queue before continuing. writer.finish(); } else { std::cout << "File error"; } return EXIT_SUCCESS; }
Io.cpp :
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 #ifndef IO_H_INCLUDED #define IO_H_INCLUDED #include "threads.hpp" #include <fstream> namespace CSV { class Row { std::vector<std::string> m_data; friend class Iterator; friend void write_row(Row const &row, std::ostream &stream); void read_next(std::istream& csv); public: inline std::string const& operator[](std::size_t index) const { return m_data[index]; } inline std::size_t size() const { return m_data.size(); } }; /** Reading *************************************************************************/ class Iterator { public: Iterator(std::istream& csv) : m_csv(csv.good() ? &csv : NULL) { ++(*this); } Iterator() : m_csv(NULL) {} //Pre-Increment Iterator& operator++() { if (m_csv != NULL) { m_row.read_next(*m_csv); m_csv = m_csv->good() ? m_csv : NULL; } return *this; } inline Row const& operator*() const { return m_row; } inline bool operator==(Iterator const& rhs) { return ((this == &rhs) || ((this->m_csv == NULL) && (rhs.m_csv == NULL))); } inline bool operator!=(Iterator const& rhs) { return !((*this) == rhs); } private: std::istream* m_csv; Row m_row; }; class Reader : public Concurrent_queue<Row>, public Thread { std::ifstream m_csv; Thread_safe_value<bool> m_finished; void work() { if(!!m_csv) { for(Iterator it(m_csv) ; it != Iterator() ; ++it) { push(*it); } m_finished.set(true); } } public: Reader() { m_finished.set(false); } inline bool open(std::string path) { m_csv.open(path.c_str()); return !!m_csv; } inline bool is_finished() { return m_finished.get(); } }; /** Writing ***************************************************************************/ void write_row(Row const &row, std::ostream &stream); //Is m_finishing really thread-safe ? By the way, is it mandatory ? class Writer : public Concurrent_queue<Row>, public Thread { std::ofstream m_csv; Thread_safe_value<bool> m_finishing; void work() { if(!!m_csv) { CSV::Row row; while(!(m_finishing.get() && empty())) { wait_and_pop(row); write_row(row, m_csv); } } } public: Writer() { m_finishing.set(false); } inline void finish() { m_finishing.set(true); catch_up(); } inline bool open(std::string path) { m_csv.open(path.c_str()); return !!m_csv; } }; } #endif
Threads.hpp :
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 #include "io.hpp" #include <boost/bind.hpp> #include <boost/tokenizer.hpp> void CSV::Row::read_next(std::istream& csv) { std::string row; std::getline(csv, row); boost::tokenizer<boost::escaped_list_separator<char> > tokenizer(row, boost::escaped_list_separator<char>('\\', ';', '\"')); m_data.assign(tokenizer.begin(), tokenizer.end()); } void CSV::write_row(Row const &row, std::ostream &stream) { std::copy(row.m_data.begin(), row.m_data.end(), std::ostream_iterator<std::string>(stream, ";")); stream << std::endl; }
D'avance, merci.
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89 #ifndef THREADS_HPP_INCLUDED #define THREADS_HPP_INCLUDED #include <boost/bind.hpp> #include <boost/thread.hpp> class Thread { protected: boost::thread *m_thread; virtual void work() = 0; void do_work() { work(); } public: Thread() : m_thread(NULL) {} virtual ~Thread() { catch_up(); if(m_thread != NULL) { delete m_thread; } } inline void catch_up() { if(m_thread != NULL) { m_thread->join(); } } void run() { m_thread = new boost::thread(boost::bind(&Thread::do_work, boost::ref(*this))); } }; /** Thread-safe datas **********************************************************/ #include <queue> #include <boost/thread/mutex.hpp> #include <boost/thread/condition.hpp> template <class T> class Thread_safe_value : public boost::noncopyable { T m_value; boost::mutex m_mutex; public: T const &get() { boost::mutex::scoped_lock lock(m_mutex); return m_value; } void set(T const &value) { boost::mutex::scoped_lock lock(m_mutex); m_value = value; } }; template<typename Data> class Concurrent_queue { std::queue<Data> m_queue; mutable boost::mutex m_mutex; boost::condition_variable m_cond; public: void push(Data const& data) { boost::mutex::scoped_lock lock(m_mutex); m_queue.push(data); lock.unlock(); m_cond.notify_one(); } bool empty() const { boost::mutex::scoped_lock lock(m_mutex); return m_queue.empty(); } void wait_and_pop(Data& popped) { boost::mutex::scoped_lock lock(m_mutex); while(m_queue.empty()) { m_cond.wait(lock); } popped = m_queue.front(); m_queue.pop(); } }; #endif // THREAD_HPP_INCLUDED
Cordialement,
Kidpaddle2.
P.S: Concurrent_queue est le seul élément qui n'est pas de moi : http://www.justsoftwaresolutions.co....variables.html. J'ai confiance en son efficacité.
Partager