Package zephir :: Package monitor :: Package agentmanager :: Module zephirservice
[frames] | no frames]

Source Code for Module zephir.monitor.agentmanager.zephirservice

  1  # -*- coding: UTF-8 -*- 
  2  ########################################################################### 
  3  # Eole NG - 2007 
  4  # Copyright Pole de Competence Eole  (Ministere Education - Academie Dijon) 
  5  # Licence CeCill  cf /root/LicenceEole.txt 
  6  # eole@ac-dijon.fr 
  7  ########################################################################### 
  8   
  9  """ 
 10  Services Twisted de collection et de publication de données. 
 11  """ 
 12   
 13  import locale, gettext, os, pwd, shutil, random 
 14  from glob import glob 
 15   
 16  # install locales early 
 17  from zephir.monitor.agentmanager import ZEPHIRAGENTS_DATADIR 
 18  APP = 'zephir-agents' 
 19  DIR = os.path.join(ZEPHIRAGENTS_DATADIR, 'i18n') 
 20  gettext.install(APP, DIR, unicode=False) 
 21   
 22   
 23  from twisted.application import internet, service 
 24  from twisted.internet import utils, reactor 
 25  from twisted.web import resource, server, static, util, xmlrpc 
 26   
 27  from zephir.monitor.agentmanager import config as cfg 
 28  from zephir.monitor.agentmanager.util import ensure_dirs, md5file, md5files, log 
 29  from zephir.monitor.agentmanager.web_resources import ZephirServerResource 
 30  from zephir.monitor.agentmanager.clientmanager import ClientManager 
 31   
 32  try: 
 33      from creole.fonctionseole import get_module_name 
 34      mod_name = get_module_name() 
 35  except: 
 36      mod_name = "" 
 37   
 38  try: 
 39      import zephir.zephir_conf.zephir_conf as conf_zeph 
 40      from zephir.lib_zephir import zephir_proxy, convert, zephir_dir, update_sudoers 
 41      from zephir.lib_zephir import log as zeph_log 
 42      registered = 1 
 43  except: 
 44      # serveur non enregistré sur zephir 
 45      registered = 0 
 46   
47 -class ZephirService(service.MultiService):
48 """Main Twisted service for Zephir apps""" 49
50 - def __init__(self, config, root_resource=None, serve_static=False):
51 """config will be completed by default values""" 52 service.MultiService.__init__(self) 53 self.config = cfg.DEFAULT_CONFIG.copy() 54 self.config.update(config) 55 self.updater = self.publisher = None 56 # mise à jour des scripts clients dans sudoers 57 if registered: 58 update_sudoers() 59 # parent web server 60 if root_resource is None: 61 self.root_resource = resource.Resource() 62 webserver = internet.TCPServer(self.config['webserver_port'], 63 server.Site(self.root_resource)) 64 webserver.setServiceParent(service.IServiceCollection(self)) 65 else: 66 self.root_resource = root_resource 67 # serve global static files 68 if serve_static: 69 self.root_resource.putChild('static', 70 static.File(self.config['static_web_dir']))
71 72 73 # subservices factory methods 74
75 - def with_updater(self):
76 assert self.updater is None 77 self.updater = UpdaterService(self.config, self, self.root_resource) 78 return self
79
80 - def with_publisher(self):
81 assert self.publisher is None 82 self.publisher = PublisherService(self.config, self, self.root_resource) 83 return self
84
86 assert self.updater is None 87 assert self.publisher is None 88 self.updater = UpdaterService(self.config, self, self.root_resource) 89 self.publisher = PublisherService(self.config, self, self.root_resource, 90 show_clients_page = False, 91 live_agents={self.config['host_ref']: self.updater.agents}) 92 return self
93 94 95 96
97 -class UpdaterService(service.MultiService, xmlrpc.XMLRPC):
98 """Schedules measures, data serialisation and upload.""" 99
100 - def __init__(self, config, parent, root_resource):
101 """config should be complete""" 102 service.MultiService.__init__(self) 103 xmlrpc.XMLRPC.__init__(self) 104 self.config = config 105 self.module = "" 106 # detect Eole module 107 if os.path.exists('/etc/eole/version'): 108 f = file('/etc/eole/version') 109 self.module = f.read().split('\n')[0] 110 f.close() 111 # updates site.cfg file 112 self.update_static_data() 113 # start subservices 114 loc, enc = locale.getdefaultlocale() 115 log.msg(_('default locale: %s encoding: %s') % (loc, enc)) 116 if enc == 'utf': 117 log.msg(_('Warning: locale encoding %s broken in RRD graphs, set e.g: LC_ALL=fr_FR') % enc) 118 self.agents = self.load_agents() 119 # attach to parent service 120 self.setServiceParent(service.IServiceCollection(parent)) 121 root_resource.putChild('xmlrpc', self)
122
123 - def startService(self):
124 """initialize zephir services""" 125 service.MultiService.startService(self) 126 reactor.callLater(2,self.schedule_all) 127 if registered != 0: 128 # on est enregistré sur zephir => initiation de 129 # la création et l'envoi d'archives 130 self.setup_uucp() 131 # dans le cas ou un reboot a été demandé, on indique que le redémarrage est bon 132 if os.path.isfile(os.path.join(zephir_dir,'reboot.lck')): 133 try: 134 zeph_log('REBOOT',0,'redémarrage du serveur terminé') 135 os.unlink(os.path.join(zephir_dir,'reboot.lck')) 136 except: 137 pass
138
139 - def load_agents(self):
140 """Charge tous les agents du répertoire de configurations.""" 141 log.msg(_("Loading agents from %s...") % self.config['config_dir']) 142 loaded_agents = {} 143 list_agents = glob(os.path.join(self.config['config_dir'], "*.agent")) 144 list_agents.extend(glob(os.path.join(self.config['config_dir'],self.module,"*.agent"))) 145 for f in list_agents: 146 log.msg(_(" from %s:") % os.path.basename(f)) 147 h = { 'AGENTS': None } 148 execfile(f, globals(), h) 149 assert h.has_key('AGENTS') 150 for a in h['AGENTS']: 151 assert not loaded_agents.has_key(a.name) 152 # init agent data and do a first archive 153 a.init_data(os.path.join(self.config['state_dir'], 154 self.config['host_ref'], 155 a.name)) 156 a.manager = self 157 a.archive() 158 loaded_agents[a.name] = a # /!\ écrasement des clés 159 log.msg(_(" %s, period %d") % (a.name, a.period)) 160 log.msg(_("Loaded.")) 161 return loaded_agents
162 163 164 # scheduling measures 165
166 - def schedule(self, agent_name):
167 """Planifie les mesures périodiques d'un agent.""" 168 assert self.agents.has_key(agent_name) 169 if self.agents[agent_name].period > 0: 170 timer = internet.TimerService(self.agents[agent_name].period, 171 self.wakeup_for_measure, agent_name) 172 timer.setName(agent_name) 173 timer.setServiceParent(service.IServiceCollection(self))
174 175
176 - def wakeup_for_measure(self, agent_name):
177 """Callback pour les mesures planifiées.""" 178 assert self.agents.has_key(agent_name) 179 # log.debug("Doing scheduled measure on " + agent_name) 180 self.agents[agent_name].scheduled_measure()
181 182
183 - def schedule_all(self):
184 """Planifie tous les agents chargés. 185 Démarre le cycle de mesures périodiques de chaque agent 186 chargé. La première mesure est prise immédiatement. 187 """ 188 for agent_name in self.agents.keys(): 189 # charge les actions disponibles (standard en premier, puis les actions locales) 190 # les actions locales écrasent les actions standard si les 2 existent 191 for action_dir in (os.path.join(self.config['action_dir'],'eole'), self.config['action_dir']): 192 f_actions = os.path.join(action_dir, "%s.actions" % agent_name) 193 if os.path.isfile(f_actions): 194 actions = {} 195 execfile(f_actions, globals(), actions) 196 for item in actions.keys(): 197 if item.startswith('action_'): 198 setattr(self.agents[agent_name], item, actions[item]) 199 # self.wakeup_for_measure(agent_name) # first measure at launch 200 self.schedule(agent_name)
201 202
203 - def timer_for_agent_named(self, agent_name):
204 assert self.agents.has_key(agent_name) 205 return self.getServiceNamed(agent_name)
206 207 208 # data upload to zephir server 209
210 - def setup_uucp(self):
211 ensure_dirs(self.config['uucp_dir']) 212 self.update_static_data() 213 # récupération du délai de connexion à zephir 214 try: 215 reload(conf_zeph) 216 # supression des éventuels répertoires de stats invalides 217 # par ex, en cas de désinscription zephir 'manuelle'. 218 219 # sur zephir : on garde toujours 0 pour éviter les conflits avec les serveurs enregistrés 220 if not mod_name.startswith('zephir'): 221 for st_dir in os.listdir(self.config['state_dir']): 222 if st_dir != str(conf_zeph.id_serveur): 223 shutil.rmtree(os.path.join(self.config['state_dir'],st_dir)) 224 # vérification sur zephir du délai de connexion 225 period = convert(zephir_proxy.serveurs.get_timeout(conf_zeph.id_serveur)[1]) 226 except: 227 period = 0 228 229 if period < 30: 230 period = self.config['upload_period'] 231 log.msg(_('Using default period : %s seconds') % period) 232 # on ajoute un décalage aléatoire (entre 30 secondes et period) au premier démarrage 233 # (pour éviter trop de connexions simultanées si le service est relancé par crontab) 234 delay = random.randrange(30,period) 235 reactor.callLater(delay,self.wakeup_for_upload)
236
237 - def update_static_data(self):
238 original = os.path.join(self.config['config_dir'], 'site.cfg') 239 if os.path.isfile(original): 240 destination = cfg.client_data_dir(self.config, self.config['host_ref']) 241 ensure_dirs(destination) 242 need_copy = False 243 try: 244 org_mtime = os.path.getmtime(original) 245 dest_mtime = os.path.getmtime(os.path.join(destination, 'site.cfg')) 246 except OSError: 247 need_copy = True 248 if need_copy or (org_mtime > dest_mtime): 249 shutil.copy(original, destination)
250
251 - def wakeup_for_upload(self, recall=True):
252 # relecture du délai de connexion sur zephir 253 try: 254 reload(conf_zeph) 255 period = convert(zephir_proxy.serveurs.get_timeout(conf_zeph.id_serveur)[1]) 256 except: 257 period = 0 258 # on relance la fonction dans le délai demandé 259 if period < 30: 260 period = self.config['upload_period'] 261 log.msg(_('Using default period : %s seconds') % period) 262 # on ajoute un décalage au premier démarrage 263 # (pour éviter trop de connexions simultanées si le service est relancé par crontab) 264 if recall: 265 reactor.callLater(period,self.wakeup_for_upload) 266 267 # virer l'ancienne archive du rép. uucp 268 for agent in self.agents.values(): 269 agent.archive() 270 # agent.reset_max_status() 271 self.update_static_data() 272 # archiver dans rép. uucp, donner les droits en lecture sur l'archive 273 try: 274 assert conf_zeph.id_serveur != 0 275 client_dir = os.path.join(self.config['tmp_data_dir'],str(conf_zeph.id_serveur)) 276 except: 277 client_dir = os.path.join(self.config['tmp_data_dir'],self.config['host_ref']) 278 try: 279 # purge du répertoire temporaire 280 if os.path.isdir(client_dir): 281 shutil.rmtree(client_dir) 282 os.makedirs(client_dir) 283 except: # non existant 284 pass 285 args = ['-Rf',os.path.abspath(os.path.join(cfg.client_data_dir(self.config, self.config['host_ref']),'site.cfg'))] 286 ignore_file = os.path.abspath(os.path.join(self.config['state_dir'],'ignore_list')) 287 if os.path.exists(ignore_file): 288 args.append(ignore_file) 289 # on ne copie que les données des agents instanciés 290 # cela évite de remonter par exemple les stats rvp si le service a été désactivé 291 for agent_name in self.agents.keys(): 292 args.append(os.path.abspath(cfg.agent_data_dir(self.config, self.config['host_ref'],agent_name))) 293 args.append(os.path.abspath(client_dir)) 294 res = utils.getProcessOutput('/bin/cp', args = args) 295 res.addCallbacks(self._make_archive, 296 lambda x: log.msg(_("/!\ copy failed (%s)\n" 297 "data: %s") 298 % (x, self.config['state_dir'])))
299
300 - def _check_md5(self):
301 # calcul de sommes md5 pour config.eol et les patchs 302 rep_src = "/etc/eole" 303 data = [] 304 for src, dst, pattern in md5files[cfg.distrib_version]: 305 if src == 'variables.eol': 306 # cas particulier : variables.eol, on génère le fichier à chaque fois 307 orig_eol = os.path.join(rep_src,'config.eol') 308 var_eol = os.path.join(rep_src,'variables.eol') 309 f_var = file(var_eol,'w') 310 if os.path.isfile(orig_eol): 311 for line in file(orig_eol).readlines(): 312 if not "valprec =" in line: 313 f_var.write(line) 314 f_var.close() 315 if os.path.isdir(os.path.join(rep_src,src)): 316 fics = os.listdir(os.path.join(rep_src,src)) 317 fics = [(os.path.join(src,fic),os.path.join(dst,fic)) for fic in fics] 318 else: 319 fics = [(src,dst)] 320 for fic, fic_dst in fics: 321 if os.path.isfile(os.path.join(rep_src,fic)): 322 if (pattern is None) or fic.endswith(pattern): 323 md5res = md5file(os.path.join(rep_src,fic)) 324 data.append("%s %s\n" % (md5res, fic_dst)) 325 try: 326 assert conf_zeph.id_serveur != 0 327 outf = file(os.path.join(self.config['tmp_data_dir'],"config%s.md5" % str(conf_zeph.id_serveur)), "w") 328 except: 329 outf = file(os.path.join(self.config['tmp_data_dir'],"config%s.md5" % self.config['host_ref']), "w") 330 outf.writelines(data) 331 outf.close()
332
333 - def _get_packages(self, *args):
334 """génère une liste des paquets installés 335 """ 336 try: 337 assert conf_zeph.id_serveur != 0 338 cmd_pkg = ("/usr/bin/dpkg-query -W >" + os.path.join(self.config['tmp_data_dir'],"packages%s.list" % str(conf_zeph.id_serveur))) 339 except: 340 cmd_pkg = ("/usr/bin/dpkg-query -W >" + os.path.join(self.config['tmp_data_dir'],"packages%s.list" % self.config['host_ref'])) 341 os.system(cmd_pkg)
342
343 - def _make_archive(self,*args):
344 self._check_md5() 345 self._get_packages() 346 # compression des données à envoyer 347 try: 348 assert conf_zeph.id_serveur != 0 349 tarball = os.path.join(self.config['uucp_dir'],'site%s.tar' % str(conf_zeph.id_serveur)) 350 except: 351 tarball = os.path.join(self.config['uucp_dir'],'site%s.tar' % self.config['host_ref']) 352 tar_cwd = os.path.dirname(os.path.abspath(self.config['tmp_data_dir'])) 353 tar_dir = os.path.basename(os.path.abspath(self.config['tmp_data_dir'])) 354 res = utils.getProcessOutput('/bin/tar', 355 args = ('czf', tarball, 356 '--exclude', 'private', 357 '-C', tar_cwd, 358 tar_dir)) 359 res.addCallbacks(self._try_chown, 360 lambda x: log.msg(_("/!\ archiving failed (%s)\n" 361 "data: %s\narchive: %s") 362 % (x, self.config['state_dir'], tarball)), 363 callbackArgs = [tarball])
364
365 - def _try_chown(self, tar_output, tarball):
366 try: 367 uucp_uid, uucp_gid = pwd.getpwnam('uucp')[2:4] 368 uid = os.getuid() 369 os.chown(tarball, uucp_uid, uucp_gid) # only change group id so that uucp can read while we can still write 370 except OSError, e: 371 log.msg("/!\ chown error, check authorizations (%s)" % e) 372 # upload uucp 373 # on fait également un chown sur le fichier deffered_logs au cas ou il serait en root 374 try: 375 uucp_uid, uucp_gid = pwd.getpwnam('uucp')[2:4] 376 os.chown('/usr/share/zephir/deffered_logs', uucp_uid, uucp_gid) 377 except: 378 log.msg("/!\ chown error on deffered_logs") 379 os.system('/usr/share/zephir/scripts/zephir_client call &> /dev/null')
380 381 382 # xmlrpc methods 383
384 - def xmlrpc_list_agents(self):
385 """@return: Liste des agents chargés""" 386 return self.agents.keys()
387 xmlrpc_list_agents.signature = [['array']] 388
389 - def xmlrpc_agents_menu(self):
390 """@return: Liste des agents chargés et structure d'affichage""" 391 try: 392 menu = {} 393 for name, agent in self.agents.items(): 394 if agent.section != None: 395 if not menu.has_key(agent.section): 396 menu[agent.section] = [] 397 menu[agent.section].append((name, agent.description)) 398 return menu 399 except Exception, e: 400 log.msg(e)
401 xmlrpc_agents_menu.signature = [['struct']] 402
403 - def xmlrpc_status_for_agents(self, agent_name_list = []):
404 """ 405 @return: Les statuts des agents listés dans un dictionnaire 406 C{{nom:status}}. Le status est lui-même un dictionnaire avec 407 pour clés C{'level'} et C{'message'}. Seuls les noms d'agents 408 effectivement chargés apparaîtront parmi les clés du 409 dictionnaire. 410 """ 411 result = {} 412 if len(agent_name_list) == 0: 413 agent_name_list = self.agents.keys() 414 for agent_name in agent_name_list: 415 if self.agents.has_key(agent_name): 416 result[agent_name] = self.agents[agent_name].check_status().to_dict() 417 return result
418 xmlrpc_status_for_agents.signature = [['string', 'struct']] 419
420 - def xmlrpc_reset_max_status_for_agents(self, agent_name_list=[]):
421 if len(agent_name_list) == 0: 422 agent_name_list = self.agents.keys() 423 for agent_name in agent_name_list: 424 if self.agents.has_key(agent_name): 425 self.agents[agent_name].reset_max_status() 426 return "ok"
427
429 self.wakeup_for_upload(False) 430 return "ok"
431 432
433 -class PublisherService(service.MultiService):
434 """Serves the web interface for current agent data""" 435
436 - def __init__(self, config, parent, root_resource, 437 live_agents=None, 438 show_clients_page=True):
439 """config should be complete""" 440 service.MultiService.__init__(self) 441 self.config = config 442 self.show_clients_page = show_clients_page 443 self.manager = ClientManager(self.config, live_agents) 444 # attach to parent service 445 self.setServiceParent(service.IServiceCollection(parent)) 446 # run webserver 447 rsrc = ZephirServerResource(self.config, self.manager) 448 root_resource.putChild('agents', rsrc) 449 default_page = './agents/' 450 if not self.show_clients_page: 451 default_page += self.config['host_ref'] + '/' 452 root_resource.putChild('', util.Redirect(default_page))
453 454 #TODO 455 # update resources: loading host structures, manager -> agent dict 456 # connect publisher and updater to zephir service (web server, config...) 457 458 # client manager: liste des host_ref, {host_ref => agent_manager} 459 # agent manager: structure, {nom => agent_data} 460