servicedvb.h: Remove unused include filepush.h
[openblackhole/openblackhole-enigma2.git] / lib / dvb / filepush.cpp
1 #include "filepush.h"
2 #include <lib/base/eerror.h>
3 #include <errno.h>
4 #include <fcntl.h>
5 #include <sys/ioctl.h>
6
7 //#define SHOW_WRITE_TIME
8
9 eFilePushThread::eFilePushThread(int blocksize, size_t buffersize):
10          m_sg(NULL),
11          m_stop(1),
12          m_send_pvr_commit(0),
13          m_stream_mode(0),
14          m_blocksize(blocksize),
15          m_buffersize(buffersize),
16          m_buffer((unsigned char *)malloc(buffersize)),
17          m_messagepump(eApp, 0),
18          m_run_state(0)
19 {
20         if (m_buffer == NULL)
21                 eFatal("[eFilePushThread] Failed to allocate %d bytes", buffersize);
22         CONNECT(m_messagepump.recv_msg, eFilePushThread::recvEvent);
23 }
24
25 eFilePushThread::~eFilePushThread()
26 {
27         stop(); /* eThread is borked, always call stop() from d'tor */
28         free(m_buffer);
29 }
30
31 static void signal_handler(int x)
32 {
33 }
34
35 static void ignore_but_report_signals()
36 {
37         /* we set the signal to not restart syscalls, so we can detect our signal. */
38         struct sigaction act;
39         act.sa_handler = signal_handler; // no, SIG_IGN doesn't do it. we want to receive the -EINTR
40         act.sa_flags = 0;
41         sigaction(SIGUSR1, &act, 0);
42 }
43
44 void eFilePushThread::thread()
45 {
46         ignore_but_report_signals();
47         hasStarted(); /* "start()" blocks until we get here */
48         setIoPrio(IOPRIO_CLASS_BE, 0);
49         eDebug("[eFilePushThread] START thread");
50
51         do
52         {
53         int eofcount = 0;
54         int buf_end = 0;
55         size_t bytes_read = 0;
56         off_t current_span_offset = 0;
57         size_t current_span_remaining = 0;
58
59         while (!m_stop)
60         {
61                 if (m_sg && !current_span_remaining)
62                 {
63                         m_sg->getNextSourceSpan(m_current_position, bytes_read, current_span_offset, current_span_remaining, m_blocksize);
64                         ASSERT(!(current_span_remaining % m_blocksize));
65                         m_current_position = current_span_offset;
66                         bytes_read = 0;
67                 }
68
69                 size_t maxread = m_buffersize;
70
71                         /* if we have a source span, don't read past the end */
72                 if (m_sg && maxread > current_span_remaining)
73                         maxread = current_span_remaining;
74
75                         /* align to blocksize */
76                 maxread -= maxread % m_blocksize;
77
78                 if (maxread)
79                 {
80 #ifdef SHOW_WRITE_TIME
81                         struct timeval starttime;
82                         struct timeval now;
83                         gettimeofday(&starttime, NULL);
84 #endif
85                         buf_end = m_source->read(m_current_position, m_buffer, maxread);
86 #ifdef SHOW_WRITE_TIME
87                         gettimeofday(&now, NULL);
88                         suseconds_t diff = (1000000 * (now.tv_sec - starttime.tv_sec)) + now.tv_usec - starttime.tv_usec;
89                         eDebug("[eFilePushThread] read %d bytes time: %9u us", buf_end, (unsigned int)diff);
90 #endif
91                 }
92                 else
93                         buf_end = 0;
94
95                 if (buf_end < 0)
96                 {
97                         buf_end = 0;
98                         /* Check m_stop after interrupted syscall. */
99                         if (m_stop) {
100                                 break;
101                         }
102                         if (errno == EINTR || errno == EBUSY || errno == EAGAIN)
103                                 continue;
104                         if (errno == EOVERFLOW)
105                         {
106                                 eWarning("[eFilePushThread] OVERFLOW while playback?");
107                                 continue;
108                         }
109                         eDebug("[eFilePushThread] read error: %m");
110                 }
111
112                         /* a read might be mis-aligned in case of a short read. */
113                 int d = buf_end % m_blocksize;
114                 if (d)
115                         buf_end -= d;
116
117                 if (buf_end == 0)
118                 {
119                                 /* on EOF, try COMMITting once. */
120                         if (m_send_pvr_commit)
121                         {
122                                 struct pollfd pfd;
123                                 pfd.fd = m_fd_dest;
124                                 pfd.events = POLLIN;
125                                 switch (poll(&pfd, 1, 250)) // wait for 250ms
126                                 {
127                                         case 0:
128                                                 eDebug("[eFilePushThread] wait for driver eof timeout");
129                                                 continue;
130                                         case 1:
131                                                 eDebug("[eFilePushThread] wait for driver eof ok");
132                                                 break;
133                                         default:
134                                                 eDebug("[eFilePushThread] wait for driver eof aborted by signal");
135                                                 /* Check m_stop after interrupted syscall. */
136                                                 if (m_stop)
137                                                         break;
138                                                 continue;
139                                 }
140                         }
141
142                         if (m_stop)
143                                 break;
144
145                                 /* in stream_mode, we are sending EOF events
146                                    over and over until somebody responds.
147
148                                    in stream_mode, think of evtEOF as "buffer underrun occurred". */
149                         sendEvent(evtEOF);
150
151                         if (m_stream_mode)
152                         {
153                                 eDebug("[eFilePushThread] reached EOF, but we are in stream mode. delaying 1 second.");
154                                 sleep(1);
155                                 continue;
156                         }
157                         else if (++eofcount < 10)
158                         {
159                                 eDebug("[eFilePushThread] reached EOF, but the file may grow. delaying 1 second.");
160                                 sleep(1);
161                                 continue;
162                         }
163                         break;
164                 } else
165                 {
166                         /* Write data to mux */
167                         int buf_start = 0;
168                         filterRecordData(m_buffer, buf_end);
169                         while ((buf_start != buf_end) && !m_stop)
170                         {
171                                 int w = write(m_fd_dest, m_buffer + buf_start, buf_end - buf_start);
172
173                                 if (w <= 0)
174                                 {
175                                         /* Check m_stop after interrupted syscall. */
176                                         if (m_stop) {
177                                                 w = 0;
178                                                 buf_start = 0;
179                                                 buf_end = 0;
180                                                 break;
181                                         }
182                                         if (w < 0 && (errno == EINTR || errno == EAGAIN || errno == EBUSY))
183                                                 continue;
184                                         eDebug("[eFilePushThread] write: %m");
185                                         sendEvent(evtWriteError);
186                                         break;
187                                 }
188                                 buf_start += w;
189                         }
190
191                         eofcount = 0;
192                         m_current_position += buf_end;
193                         bytes_read += buf_end;
194                         if (m_sg)
195                                 current_span_remaining -= buf_end;
196                 }
197         }
198         sendEvent(evtStopped);
199
200         { /* mutex lock scope */
201                 eSingleLocker lock(m_run_mutex);
202                 m_run_state = 0;
203                 m_run_cond.signal(); /* Tell them we're here */
204                 while (m_stop == 2) {
205                         eDebug("[eFilePushThread] PAUSED");
206                         m_run_cond.wait(m_run_mutex);
207                 }
208                 if (m_stop == 0)
209                         m_run_state = 1;
210         }
211
212         } while (m_stop == 0);
213         eDebug("[eFilePushThread] STOP");
214 }
215
216 void eFilePushThread::start(ePtr<iTsSource> &source, int fd_dest)
217 {
218         m_source = source;
219         m_fd_dest = fd_dest;
220         m_current_position = 0;
221         m_run_state = 1;
222         m_stop = 0;
223         run();
224 }
225
226 void eFilePushThread::stop()
227 {
228         /* if we aren't running, don't bother stopping. */
229         if (m_stop == 1)
230                 return;
231         m_stop = 1;
232         eDebug("[eFilePushThread] stopping thread");
233         m_run_cond.signal(); /* Break out of pause if needed */
234         sendSignal(SIGUSR1);
235         kill(); /* Kill means join actually */
236 }
237
238 void eFilePushThread::pause()
239 {
240         if (m_stop == 1)
241         {
242                 eWarning("[eFilePushThread] pause called while not running");
243                 return;
244         }
245         /* Set thread into a paused state by setting m_stop to 2 and wait
246          * for the thread to acknowledge that */
247         eSingleLocker lock(m_run_mutex);
248         m_stop = 2;
249         sendSignal(SIGUSR1);
250         m_run_cond.signal(); /* Trigger if in weird state */
251         while (m_run_state) {
252                 eDebug("[eFilePushThread] waiting for pause");
253                 m_run_cond.wait(m_run_mutex);
254         }
255 }
256
257 void eFilePushThread::resume()
258 {
259         if (m_stop != 2)
260         {
261                 eWarning("[eFilePushThread] resume called while not paused");
262                 return;
263         }
264         /* Resume the paused thread by resetting the flag and
265          * signal the thread to release it */
266         eSingleLocker lock(m_run_mutex);
267         m_stop = 0;
268         m_run_cond.signal(); /* Tell we're ready to resume */
269 }
270
271 void eFilePushThread::enablePVRCommit(int s)
272 {
273         m_send_pvr_commit = s;
274 }
275
276 void eFilePushThread::setStreamMode(int s)
277 {
278         m_stream_mode = s;
279 }
280
281 void eFilePushThread::setScatterGather(iFilePushScatterGather *sg)
282 {
283         m_sg = sg;
284 }
285
286 void eFilePushThread::sendEvent(int evt)
287 {
288         m_messagepump.send(evt);
289 }
290
291 void eFilePushThread::recvEvent(const int &evt)
292 {
293         m_event(evt);
294 }
295
296 void eFilePushThread::filterRecordData(const unsigned char *data, int len)
297 {
298 }
299
300
301
302
303 eFilePushThreadRecorder::eFilePushThreadRecorder(unsigned char* buffer, size_t buffersize):
304         m_fd_source(-1),
305         m_buffersize(buffersize),
306         m_buffer(buffer),
307         m_overflow_count(0),
308         m_stop(1),
309         m_messagepump(eApp, 0)
310 {
311         CONNECT(m_messagepump.recv_msg, eFilePushThreadRecorder::recvEvent);
312 }
313
314 void eFilePushThreadRecorder::thread()
315 {
316         setIoPrio(IOPRIO_CLASS_RT, 7);
317
318         eDebug("[eFilePushThreadRecorder] THREAD START");
319
320         /* we set the signal to not restart syscalls, so we can detect our signal. */
321         struct sigaction act;
322         act.sa_handler = signal_handler; // no, SIG_IGN doesn't do it. we want to receive the -EINTR
323         act.sa_flags = 0;
324         sigaction(SIGUSR1, &act, 0);
325
326         hasStarted();
327
328         /* m_stop must be evaluated after each syscall. */
329         while (!m_stop)
330         {
331                 ssize_t bytes = ::read(m_fd_source, m_buffer, m_buffersize);
332                 if (bytes < 0)
333                 {
334                         bytes = 0;
335                         if (errno == EINTR || errno == EBUSY || errno == EAGAIN)
336                                 continue;
337                         if (errno == EOVERFLOW)
338                         {
339                                 eWarning("[eFilePushThreadRecorder] OVERFLOW while recording");
340                                 ++m_overflow_count;
341                                 continue;
342                         }
343                         eDebug("[eFilePushThreadRecorder] *read error* (%m) - aborting thread because i don't know what else to do.");
344                         sendEvent(evtReadError);
345                         break;
346                 }
347
348 #ifdef SHOW_WRITE_TIME
349                 struct timeval starttime;
350                 struct timeval now;
351                 gettimeofday(&starttime, NULL);
352 #endif
353                 int w = writeData(bytes);
354 #ifdef SHOW_WRITE_TIME
355                 gettimeofday(&now, NULL);
356                 suseconds_t diff = (1000000 * (now.tv_sec - starttime.tv_sec)) + now.tv_usec - starttime.tv_usec;
357                 eDebug("[eFilePushThreadRecorder] write %d bytes time: %9u us", bytes, (unsigned int)diff);
358 #endif
359                 if (w < 0)
360                 {
361                         eDebug("[eFilePushThreadRecorder] WRITE ERROR, aborting thread: %m");
362                         sendEvent(evtWriteError);
363                         break;
364                 }
365         }
366         flush();
367         sendEvent(evtStopped);
368         eDebug("[eFilePushThreadRecorder] THREAD STOP");
369 }
370
371 void eFilePushThreadRecorder::start(int fd)
372 {
373         m_fd_source = fd;
374         m_stop = 0;
375         run();
376 }
377
378 void eFilePushThreadRecorder::stop()
379 {
380         /* if we aren't running, don't bother stopping. */
381         if (m_stop == 1)
382                 return;
383         m_stop = 1;
384         eDebug("[eFilePushThreadRecorder] stopping thread."); /* just do it ONCE. it won't help to do this more than once. */
385         sendSignal(SIGUSR1);
386         kill();
387 }
388
389 void eFilePushThreadRecorder::sendEvent(int evt)
390 {
391         m_messagepump.send(evt);
392 }
393
394 void eFilePushThreadRecorder::recvEvent(const int &evt)
395 {
396         m_event(evt);
397 }