Task improvements, run fsck in background
[openblackhole/openblackhole-enigma2.git] / lib / python / Components / Task.py
1 # A Job consists of many "Tasks".
2 # A task is the run of an external tool, with proper methods for failure handling
3
4 from Tools.CList import CList
5
6 class Job(object):
7         NOT_STARTED, IN_PROGRESS, FINISHED, FAILED = range(4)
8         def __init__(self, name):
9                 self.tasks = [ ]
10                 self.resident_tasks = [ ]
11                 self.workspace = "/tmp"
12                 self.current_task = 0
13                 self.callback = None
14                 self.name = name
15                 self.finished = False
16                 self.end = 100
17                 self.__progress = 0
18                 self.weightScale = 1
19                 self.afterEvent = None
20
21                 self.state_changed = CList()
22
23                 self.status = self.NOT_STARTED
24
25         # description is a dict
26         def fromDescription(self, description):
27                 pass
28
29         def createDescription(self):
30                 return None
31
32         def getProgress(self):
33                 if self.current_task == len(self.tasks):
34                         return self.end
35                 t = self.tasks[self.current_task]
36                 jobprogress = t.weighting * t.progress / float(t.end) + sum([task.weighting for task in self.tasks[:self.current_task]])
37                 return int(jobprogress*self.weightScale)
38
39         progress = property(getProgress)
40
41         def getStatustext(self):
42                 return { self.NOT_STARTED: _("Waiting"), self.IN_PROGRESS: _("In Progress"), self.FINISHED: _("Finished"), self.FAILED: _("Failed") }[self.status]
43
44         def task_progress_changed_CB(self):
45                 self.state_changed()
46
47         def addTask(self, task):
48                 task.job = self
49                 task.task_progress_changed = self.task_progress_changed_CB
50                 self.tasks.append(task)
51
52         def start(self, callback):
53                 assert self.callback is None
54                 self.callback = callback
55                 self.restart()
56
57         def restart(self):
58                 self.status = self.IN_PROGRESS
59                 self.state_changed()
60                 self.runNext()
61                 sumTaskWeightings = sum([t.weighting for t in self.tasks]) or 1
62                 self.weightScale = self.end / float(sumTaskWeightings)
63
64         def runNext(self):
65                 if self.current_task == len(self.tasks):
66                         if len(self.resident_tasks) == 0:
67                                 cb = self.callback
68                                 self.callback = None
69                                 self.status = self.FINISHED
70                                 self.state_changed()
71                                 cb(self, None, [])
72                         else:
73                                 print "still waiting for %d resident task(s) %s to finish" % (len(self.resident_tasks), str(self.resident_tasks))
74                 else:
75                         self.tasks[self.current_task].run(self.taskCallback)
76                         self.state_changed()
77
78         def taskCallback(self, task, res, stay_resident = False):
79                 cb_idx = self.tasks.index(task)
80                 if stay_resident:
81                         if cb_idx not in self.resident_tasks:
82                                 self.resident_tasks.append(self.current_task)
83                                 print "task going resident:", task
84                         else:
85                                 print "task keeps staying resident:", task
86                                 return
87                 if len(res):
88                         print ">>> Error:", res
89                         self.status = self.FAILED
90                         self.state_changed()
91                         self.callback(self, task, res)
92                 if cb_idx != self.current_task:
93                         if cb_idx in self.resident_tasks:
94                                 print "resident task finished:", task
95                                 self.resident_tasks.remove(cb_idx)
96                 if res == []:
97                         self.state_changed()
98                         self.current_task += 1
99                         self.runNext()
100
101         def retry(self):
102                 assert self.status == self.FAILED
103                 self.restart()
104
105         def abort(self):
106                 if self.current_task < len(self.tasks):
107                         self.tasks[self.current_task].abort()
108                 for i in self.resident_tasks:
109                         self.tasks[i].abort()
110
111         def cancel(self):
112                 # some Jobs might have a better idea of how to cancel a job
113                 self.abort()
114                 
115         def __str__(self):      
116                 return "Components.Task.Job name=%s #tasks=%s" % (self.name, len(self.tasks))
117
118 class Task(object):
119         def __init__(self, job, name):
120                 self.name = name
121                 self.immediate_preconditions = [ ]
122                 self.global_preconditions = [ ]
123                 self.postconditions = [ ]
124                 self.returncode = None
125                 self.initial_input = None
126                 self.job = None
127
128                 self.end = 100
129                 self.weighting = 100
130                 self.__progress = 0
131                 self.cmd = None
132                 self.cwd = "/tmp"
133                 self.args = [ ]
134                 self.cmdline = None
135                 self.task_progress_changed = None
136                 self.output_line = ""
137                 job.addTask(self)
138                 self.container = None
139
140         def setCommandline(self, cmd, args):
141                 self.cmd = cmd
142                 self.args = args
143
144         def setTool(self, tool):
145                 self.cmd = tool
146                 self.args = [tool]
147                 self.global_preconditions.append(ToolExistsPrecondition())
148                 self.postconditions.append(ReturncodePostcondition())
149
150         def setCmdline(self, cmdline):
151                 self.cmdline = cmdline
152
153         def checkPreconditions(self, immediate = False):
154                 not_met = [ ]
155                 if immediate:
156                         preconditions = self.immediate_preconditions
157                 else:
158                         preconditions = self.global_preconditions
159                 for precondition in preconditions:
160                         if not precondition.check(self):
161                                 not_met.append(precondition)
162                 return not_met
163
164         def _run(self):
165                 from enigma import eConsoleAppContainer
166                 self.container = eConsoleAppContainer()
167                 self.container.appClosed.append(self.processFinished)
168                 self.container.stdoutAvail.append(self.processStdout)
169                 self.container.stderrAvail.append(self.processStderr)
170                 if self.cwd is not None:
171                         self.container.setCWD(self.cwd)
172                 if not self.cmd and self.cmdline:
173                         print "execute:", self.container.execute(self.cmdline), self.cmdline
174                 else:
175                         assert self.cmd is not None
176                         assert len(self.args) >= 1
177                         print "execute:", self.container.execute(self.cmd, *self.args), ' '.join(self.args)
178                 if self.initial_input:
179                         self.writeInput(self.initial_input)
180
181         def run(self, callback):
182                 failed_preconditions = self.checkPreconditions(True) + self.checkPreconditions(False)
183                 if failed_preconditions:
184                         callback(self, failed_preconditions)
185                         return
186                 self.prepare()
187                 self.callback = callback
188                 self._run()
189
190         def prepare(self):
191                 pass
192
193         def cleanup(self, failed):
194                 pass
195         
196         def processStdout(self, data):
197                 self.processOutput(data)
198                 
199         def processStderr(self, data):
200                 self.processOutput(data)
201
202         def processOutput(self, data):
203                 self.output_line += data
204                 while True:
205                         i = self.output_line.find('\n')
206                         if i == -1:
207                                 break
208                         self.processOutputLine(self.output_line[:i+1])
209                         self.output_line = self.output_line[i+1:]
210
211         def processOutputLine(self, line):
212                 print "[Task %s]" % self.name, line
213                 pass
214
215         def processFinished(self, returncode):
216                 self.returncode = returncode
217                 self.finish()
218
219         def abort(self):
220                 if self.container:
221                         self.container.kill()
222                 self.finish(aborted = True)
223
224         def finish(self, aborted = False):
225                 self.afterRun()
226                 not_met = [ ]
227                 if aborted:
228                         not_met.append(AbortedPostcondition())
229                 else:
230                         for postcondition in self.postconditions:
231                                 if not postcondition.check(self):
232                                         not_met.append(postcondition)
233                 self.cleanup(not_met)
234                 self.callback(self, not_met)
235
236         def afterRun(self):
237                 pass
238
239         def writeInput(self, input):
240                 self.container.write(input)
241
242         def getProgress(self):
243                 return self.__progress
244
245         def setProgress(self, progress):
246                 if progress > self.end:
247                         progress = self.end
248                 if progress < 0:
249                         progress = 0
250                 self.__progress = progress
251                 if self.task_progress_changed:
252                         self.task_progress_changed()
253
254         progress = property(getProgress, setProgress)
255
256         def __str__(self):      
257                 return "Components.Task.Task name=%s" % (self.name)
258
259 class PythonTask(Task):
260         def _run(self):
261                 from twisted.internet import threads, task
262                 self.aborted = False
263                 self.pos = 0
264                 threads.deferToThread(self.work).addBoth(self.onComplete)
265                 self.timer = task.LoopingCall(self.onTimer)
266                 self.timer.start(5, False)
267         def work(self):
268                 raise NotImplemented, "work"
269         def abort(self):
270                 self.aborted = True
271                 if self.callback is None:
272                         self.finish(aborted = True)
273         def onTimer(self):
274                 self.setProgress(self.pos)
275         def onComplete(self, result):
276                 self.postconditions.append(FailedPostcondition(result))
277                 self.timer.stop()
278                 del self.timer
279                 self.finish()
280
281
282 # The jobmanager will execute multiple jobs, each after another.
283 # later, it will also support suspending jobs (and continuing them after reboot etc)
284 # It also supports a notification when some error occured, and possibly a retry.
285 class JobManager:
286         def __init__(self):
287                 self.active_jobs = [ ]
288                 self.failed_jobs = [ ]
289                 self.job_classes = [ ]
290                 self.in_background = False
291                 self.active_job = None
292
293         def AddJob(self, job):
294                 self.active_jobs.append(job)
295                 self.kick()
296
297         def kick(self):
298                 if self.active_job is None:
299                         if self.active_jobs:
300                                 self.active_job = self.active_jobs.pop(0)
301                                 self.active_job.start(self.jobDone)
302
303         def jobDone(self, job, task, problems):
304                 print "job", job, "completed with", problems, "in", task
305                 from Tools import Notifications
306                 if self.in_background:
307                         from Screens.TaskView import JobView
308                         self.in_background = False
309                         Notifications.AddNotification(JobView, self.active_job)
310                 if problems:
311                         from Screens.MessageBox import MessageBox
312                         if problems[0].RECOVERABLE:
313                                 Notifications.AddNotificationWithCallback(self.errorCB, MessageBox, _("Error: %s\nRetry?") % (problems[0].getErrorMessage(task)))
314                         else:
315                                 Notifications.AddNotification(MessageBox, _("Error") + (': %s') % (problems[0].getErrorMessage(task)), type = MessageBox.TYPE_ERROR )
316                                 self.errorCB(False)
317                         return
318                         #self.failed_jobs.append(self.active_job)
319
320                 self.active_job = None
321                 self.kick()
322
323         def errorCB(self, answer):
324                 if answer:
325                         print "retrying job"
326                         self.active_job.retry()
327                 else:
328                         print "not retrying job."
329                         self.failed_jobs.append(self.active_job)
330                         self.active_job = None
331                         self.kick()
332
333         def getPendingJobs(self):
334                 list = [ ]
335                 if self.active_job:
336                         list.append(self.active_job)
337                 list += self.active_jobs
338                 return list
339 # some examples:
340 #class PartitionExistsPostcondition:
341 #       def __init__(self, device):
342 #               self.device = device
343 #
344 #       def check(self, task):
345 #               import os
346 #               return os.access(self.device + "part1", os.F_OK)
347 #
348 #class CreatePartitionTask(Task):
349 #       def __init__(self, device):
350 #               Task.__init__(self, _("Create Partition"))
351 #               self.device = device
352 #               self.setTool("/sbin/sfdisk")
353 #               self.args += ["-f", self.device + "disc"]
354 #               self.initial_input = "0,\n;\n;\n;\ny\n"
355 #               self.postconditions.append(PartitionExistsPostcondition(self.device))
356 #
357 #class CreateFilesystemTask(Task):
358 #       def __init__(self, device, partition = 1, largefile = True):
359 #               Task.__init__(self, _("Create Filesystem"))
360 #               self.setTool("/sbin/mkfs.ext")
361 #               if largefile:
362 #                       self.args += ["-T", "largefile"]
363 #               self.args.append("-m0")
364 #               self.args.append(device + "part%d" % partition)
365 #
366 #class FilesystemMountTask(Task):
367 #       def __init__(self, device, partition = 1, filesystem = "ext3"):
368 #               Task.__init__(self, _("Mounting Filesystem"))
369 #               self.setTool("/bin/mount")
370 #               if filesystem is not None:
371 #                       self.args += ["-t", filesystem]
372 #               self.args.append(device + "part%d" % partition)
373
374 class Condition:
375         RECOVERABLE = False
376
377         def getErrorMessage(self, task):
378                 return _("An unknown error occured!") + " (%s @ task %s)" % (self.__class__.__name__, task.__class__.__name__)
379
380 class WorkspaceExistsPrecondition(Condition):
381         def check(self, task):
382                 return os.access(task.job.workspace, os.W_OK)
383
384 class DiskspacePrecondition(Condition):
385         def __init__(self, diskspace_required):
386                 self.diskspace_required = diskspace_required
387                 self.diskspace_available = 0
388
389         def check(self, task):
390                 import os
391                 try:
392                         s = os.statvfs(task.job.workspace)
393                         self.diskspace_available = s.f_bsize * s.f_bavail
394                         return self.diskspace_available >= self.diskspace_required
395                 except OSError:
396                         return False
397
398         def getErrorMessage(self, task):
399                 return _("Not enough diskspace. Please free up some diskspace and try again. (%d MB required, %d MB available)") % (self.diskspace_required / 1024 / 1024, self.diskspace_available / 1024 / 1024)
400
401 class ToolExistsPrecondition(Condition):
402         def check(self, task):
403                 import os
404                 
405                 if task.cmd[0]=='/':
406                         self.realpath = task.cmd
407                         print "[Task.py][ToolExistsPrecondition] WARNING: usage of absolute paths for tasks should be avoided!" 
408                         return os.access(self.realpath, os.X_OK)
409                 else:
410                         self.realpath = task.cmd
411                         path = os.environ.get('PATH', '').split(os.pathsep)
412                         path.append(task.cwd + '/')
413                         absolutes = filter(lambda file: os.access(file, os.X_OK), map(lambda directory, file = task.cmd: os.path.join(directory, file), path))
414                         if len(absolutes) > 0:
415                                 self.realpath = task.cmd[0]
416                                 return True
417                 return False 
418
419         def getErrorMessage(self, task):
420                 return _("A required tool (%s) was not found.") % (self.realpath)
421
422 class AbortedPostcondition(Condition):
423         def getErrorMessage(self, task):
424                 return "Cancelled upon user request"
425
426 class ReturncodePostcondition(Condition):
427         def check(self, task):
428                 return task.returncode == 0
429
430 class FailedPostcondition(Condition):
431         def __init__(self, exception):
432                 self.exception = exception
433         def getErrorMessage(self, task):
434                 return str(self.exception)
435         def check(self, task):
436                 return (self.exception is None) or (self.exception == 0) 
437
438 #class HDDInitJob(Job):
439 #       def __init__(self, device):
440 #               Job.__init__(self, _("Initialize Harddisk"))
441 #               self.device = device
442 #               self.fromDescription(self.createDescription())
443 #
444 #       def fromDescription(self, description):
445 #               self.device = description["device"]
446 #               self.addTask(CreatePartitionTask(self.device))
447 #               self.addTask(CreateFilesystemTask(self.device))
448 #               self.addTask(FilesystemMountTask(self.device))
449 #
450 #       def createDescription(self):
451 #               return {"device": self.device}
452
453 job_manager = JobManager()