0d473d52cde6927c0240431f5526885788e8f33b
[openblackhole/openblackhole-enigma2.git] / lib / base / filepush.cpp
1 #include <lib/base/filepush.h>
2 #include <lib/base/eerror.h>
3 #include <errno.h>
4 #include <fcntl.h>
5 #include <sys/ioctl.h>
6
7 //#define SHOW_WRITE_TIME
8
9 eFilePushThread::eFilePushThread(int io_prio_class, int io_prio_level, int blocksize, size_t buffersize)
10         :prio_class(io_prio_class),
11          prio(io_prio_level),
12          m_sg(NULL),
13          m_stop(1),
14          m_send_pvr_commit(0),
15          m_stream_mode(0),
16          m_blocksize(blocksize),
17          m_buffersize(buffersize),
18          m_buffer((unsigned char *)malloc(buffersize)),
19          m_messagepump(eApp, 0),
20          m_run_state(0)
21 {
22         if (m_buffer == NULL)
23                 eFatal("[eFilePushThread] Failed to allocate %d bytes", buffersize);
24         CONNECT(m_messagepump.recv_msg, eFilePushThread::recvEvent);
25 }
26
27 eFilePushThread::~eFilePushThread()
28 {
29         stop(); /* eThread is borked, always call stop() from d'tor */
30         free(m_buffer);
31 }
32
33 static void signal_handler(int x)
34 {
35 }
36
37 static void ignore_but_report_signals()
38 {
39         /* we set the signal to not restart syscalls, so we can detect our signal. */
40         struct sigaction act;
41         act.sa_handler = signal_handler; // no, SIG_IGN doesn't do it. we want to receive the -EINTR
42         act.sa_flags = 0;
43         sigaction(SIGUSR1, &act, 0);
44 }
45
46 void eFilePushThread::thread()
47 {
48         ignore_but_report_signals();
49         hasStarted(); /* "start()" blocks until we get here */
50         setIoPrio(prio_class, prio);
51         eDebug("[eFilePushThread] START thread");
52
53         do
54         {
55         int eofcount = 0;
56         int buf_end = 0;
57         size_t bytes_read = 0;
58         off_t current_span_offset = 0;
59         size_t current_span_remaining = 0;
60
61         while (!m_stop)
62         {
63                 if (m_sg && !current_span_remaining)
64                 {
65                         m_sg->getNextSourceSpan(m_current_position, bytes_read, current_span_offset, current_span_remaining, m_blocksize);
66                         ASSERT(!(current_span_remaining % m_blocksize));
67                         m_current_position = current_span_offset;
68                         bytes_read = 0;
69                 }
70
71                 size_t maxread = m_buffersize;
72
73                         /* if we have a source span, don't read past the end */
74                 if (m_sg && maxread > current_span_remaining)
75                         maxread = current_span_remaining;
76
77                         /* align to blocksize */
78                 maxread -= maxread % m_blocksize;
79
80                 if (maxread)
81                 {
82 #ifdef SHOW_WRITE_TIME
83                         struct timeval starttime;
84                         struct timeval now;
85                         gettimeofday(&starttime, NULL);
86 #endif
87                         buf_end = m_source->read(m_current_position, m_buffer, maxread);
88 #ifdef SHOW_WRITE_TIME
89                         gettimeofday(&now, NULL);
90                         suseconds_t diff = (1000000 * (now.tv_sec - starttime.tv_sec)) + now.tv_usec - starttime.tv_usec;
91                         eDebug("[eFilePushThread] read %d bytes time: %9u us", buf_end, (unsigned int)diff);
92 #endif
93                 }
94                 else
95                         buf_end = 0;
96
97                 if (buf_end < 0)
98                 {
99                         buf_end = 0;
100                         /* Check m_stop after interrupted syscall. */
101                         if (m_stop) {
102                                 break;
103                         }
104                         if (errno == EINTR || errno == EBUSY || errno == EAGAIN)
105                                 continue;
106                         if (errno == EOVERFLOW)
107                         {
108                                 eWarning("[eFilePushThread] OVERFLOW while playback?");
109                                 continue;
110                         }
111                         eDebug("[eFilePushThread] read error: %m");
112                 }
113
114                         /* a read might be mis-aligned in case of a short read. */
115                 int d = buf_end % m_blocksize;
116                 if (d)
117                         buf_end -= d;
118
119                 if (buf_end == 0)
120                 {
121                                 /* on EOF, try COMMITting once. */
122                         if (m_send_pvr_commit)
123                         {
124                                 struct pollfd pfd;
125                                 pfd.fd = m_fd_dest;
126                                 pfd.events = POLLIN;
127                                 switch (poll(&pfd, 1, 250)) // wait for 250ms
128                                 {
129                                         case 0:
130                                                 eDebug("[eFilePushThread] wait for driver eof timeout");
131                                                 continue;
132                                         case 1:
133                                                 eDebug("[eFilePushThread] wait for driver eof ok");
134                                                 break;
135                                         default:
136                                                 eDebug("[eFilePushThread] wait for driver eof aborted by signal");
137                                                 /* Check m_stop after interrupted syscall. */
138                                                 if (m_stop)
139                                                         break;
140                                                 continue;
141                                 }
142                         }
143
144                         if (m_stop)
145                                 break;
146
147                                 /* in stream_mode, we are sending EOF events
148                                    over and over until somebody responds.
149
150                                    in stream_mode, think of evtEOF as "buffer underrun occurred". */
151                         sendEvent(evtEOF);
152
153                         if (m_stream_mode)
154                         {
155                                 eDebug("[eFilePushThread] reached EOF, but we are in stream mode. delaying 1 second.");
156                                 sleep(1);
157                                 continue;
158                         }
159                         else if (++eofcount < 10)
160                         {
161                                 eDebug("[eFilePushThread] reached EOF, but the file may grow. delaying 1 second.");
162                                 sleep(1);
163                                 continue;
164                         }
165                         break;
166                 } else
167                 {
168                         /* Write data to mux */
169                         int buf_start = 0;
170                         filterRecordData(m_buffer, buf_end);
171                         while ((buf_start != buf_end) && !m_stop)
172                         {
173                                 int w = write(m_fd_dest, m_buffer + buf_start, buf_end - buf_start);
174
175                                 if (w <= 0)
176                                 {
177                                         /* Check m_stop after interrupted syscall. */
178                                         if (m_stop) {
179                                                 w = 0;
180                                                 buf_start = 0;
181                                                 buf_end = 0;
182                                                 break;
183                                         }
184                                         if (w < 0 && (errno == EINTR || errno == EAGAIN || errno == EBUSY))
185                                                 continue;
186                                         eDebug("[eFilePushThread] write: %m");
187                                         sendEvent(evtWriteError);
188                                         break;
189                                 }
190                                 buf_start += w;
191                         }
192
193                         eofcount = 0;
194                         m_current_position += buf_end;
195                         bytes_read += buf_end;
196                         if (m_sg)
197                                 current_span_remaining -= buf_end;
198                 }
199         }
200         sendEvent(evtStopped);
201
202         { /* mutex lock scope */
203                 eSingleLocker lock(m_run_mutex);
204                 m_run_state = 0;
205                 m_run_cond.signal(); /* Tell them we're here */
206                 while (m_stop == 2) {
207                         eDebug("[eFilePushThread] PAUSED");
208                         m_run_cond.wait(m_run_mutex);
209                 }
210                 if (m_stop == 0)
211                         m_run_state = 1;
212         }
213
214         } while (m_stop == 0);
215         eDebug("[eFilePushThread] STOP");
216 }
217
218 void eFilePushThread::start(ePtr<iTsSource> &source, int fd_dest)
219 {
220         m_source = source;
221         m_fd_dest = fd_dest;
222         m_current_position = 0;
223         m_run_state = 1;
224         m_stop = 0;
225         run();
226 }
227
228 void eFilePushThread::stop()
229 {
230         /* if we aren't running, don't bother stopping. */
231         if (m_stop == 1)
232                 return;
233         m_stop = 1;
234         eDebug("[eFilePushThread] stopping thread");
235         m_run_cond.signal(); /* Break out of pause if needed */
236         sendSignal(SIGUSR1);
237         kill(); /* Kill means join actually */
238 }
239
240 void eFilePushThread::pause()
241 {
242         if (m_stop == 1)
243         {
244                 eWarning("[eFilePushThread] pause called while not running");
245                 return;
246         }
247         /* Set thread into a paused state by setting m_stop to 2 and wait
248          * for the thread to acknowledge that */
249         eSingleLocker lock(m_run_mutex);
250         m_stop = 2;
251         sendSignal(SIGUSR1);
252         m_run_cond.signal(); /* Trigger if in weird state */
253         while (m_run_state) {
254                 eDebug("[eFilePushThread] waiting for pause");
255                 m_run_cond.wait(m_run_mutex);
256         }
257 }
258
259 void eFilePushThread::resume()
260 {
261         if (m_stop != 2)
262         {
263                 eWarning("[eFilePushThread] resume called while not paused");
264                 return;
265         }
266         /* Resume the paused thread by resetting the flag and
267          * signal the thread to release it */
268         eSingleLocker lock(m_run_mutex);
269         m_stop = 0;
270         m_run_cond.signal(); /* Tell we're ready to resume */
271 }
272
273 void eFilePushThread::enablePVRCommit(int s)
274 {
275         m_send_pvr_commit = s;
276 }
277
278 void eFilePushThread::setStreamMode(int s)
279 {
280         m_stream_mode = s;
281 }
282
283 void eFilePushThread::setScatterGather(iFilePushScatterGather *sg)
284 {
285         m_sg = sg;
286 }
287
288 void eFilePushThread::sendEvent(int evt)
289 {
290         m_messagepump.send(evt);
291 }
292
293 void eFilePushThread::recvEvent(const int &evt)
294 {
295         m_event(evt);
296 }
297
298 void eFilePushThread::filterRecordData(const unsigned char *data, int len)
299 {
300 }
301
302
303
304
305 eFilePushThreadRecorder::eFilePushThreadRecorder(unsigned char* buffer, size_t buffersize):
306         m_fd_source(-1),
307         m_buffersize(buffersize),
308         m_buffer(buffer),
309         m_overflow_count(0),
310         m_stop(1),
311         m_messagepump(eApp, 0)
312 {
313         CONNECT(m_messagepump.recv_msg, eFilePushThreadRecorder::recvEvent);
314 }
315
316 void eFilePushThreadRecorder::thread()
317 {
318         setIoPrio(IOPRIO_CLASS_RT, 7);
319
320         eDebug("[eFilePushThreadRecorder] THREAD START");
321
322         /* we set the signal to not restart syscalls, so we can detect our signal. */
323         struct sigaction act;
324         act.sa_handler = signal_handler; // no, SIG_IGN doesn't do it. we want to receive the -EINTR
325         act.sa_flags = 0;
326         sigaction(SIGUSR1, &act, 0);
327
328         hasStarted();
329
330         /* m_stop must be evaluated after each syscall. */
331         while (!m_stop)
332         {
333                 ssize_t bytes = ::read(m_fd_source, m_buffer, m_buffersize);
334                 if (bytes < 0)
335                 {
336                         bytes = 0;
337                         if (errno == EINTR || errno == EBUSY || errno == EAGAIN)
338                                 continue;
339                         if (errno == EOVERFLOW)
340                         {
341                                 eWarning("[eFilePushThreadRecorder] OVERFLOW while recording");
342                                 ++m_overflow_count;
343                                 continue;
344                         }
345                         eDebug("[eFilePushThreadRecorder] *read error* (%m) - aborting thread because i don't know what else to do.");
346                         sendEvent(evtReadError);
347                         break;
348                 }
349
350 #ifdef SHOW_WRITE_TIME
351                 struct timeval starttime;
352                 struct timeval now;
353                 gettimeofday(&starttime, NULL);
354 #endif
355                 int w = writeData(bytes);
356 #ifdef SHOW_WRITE_TIME
357                 gettimeofday(&now, NULL);
358                 suseconds_t diff = (1000000 * (now.tv_sec - starttime.tv_sec)) + now.tv_usec - starttime.tv_usec;
359                 eDebug("[eFilePushThreadRecorder] write %d bytes time: %9u us", bytes, (unsigned int)diff);
360 #endif
361                 if (w < 0)
362                 {
363                         eDebug("[eFilePushThreadRecorder] WRITE ERROR, aborting thread: %m");
364                         sendEvent(evtWriteError);
365                         break;
366                 }
367         }
368         flush();
369         sendEvent(evtStopped);
370         eDebug("[eFilePushThreadRecorder] THREAD STOP");
371 }
372
373 void eFilePushThreadRecorder::start(int fd)
374 {
375         m_fd_source = fd;
376         m_stop = 0;
377         run();
378 }
379
380 void eFilePushThreadRecorder::stop()
381 {
382         /* if we aren't running, don't bother stopping. */
383         if (m_stop == 1)
384                 return;
385         m_stop = 1;
386         eDebug("[eFilePushThreadRecorder] stopping thread."); /* just do it ONCE. it won't help to do this more than once. */
387         sendSignal(SIGUSR1);
388         kill();
389 }
390
391 void eFilePushThreadRecorder::sendEvent(int evt)
392 {
393         m_messagepump.send(evt);
394 }
395
396 void eFilePushThreadRecorder::recvEvent(const int &evt)
397 {
398         m_event(evt);
399 }