""" Loads all puppet files in environment, parse them, and store the parsed data in the database. """ import hashlib import json import os import os.path import subprocess import time from sqlalchemy.sql import text import threading from threading import Lock, Thread from queue import Queue # import pyenc from pyenc.model import db from pyenc import model from typing import Union, Generator def with_lock(lock, proc): try: lock.acquire() proc() finally: lock.release() def call(proc, *args): proc(*args) path = Union[str, bytes] def find(path: path, **kvs) -> list[bytes]: """Wrapper around find(1).""" cmdline = ['find', path] for key, value in kvs.items(): cmdline.append(f'-{key}') cmdline.append(value) cmdline.append('-print0') cmd = subprocess.run(cmdline, capture_output=True, check=True) return (f for f in cmd.stdout.split(b'\0') if f) class PuppetParseError(Exception): def __init__(self, code, msg): super().__init__() self.code = code self.msg = msg def __repr__(self): return f'PuppetParserError({self.code}, {self.msg})' def __str__(self): return repr(self) def puppet_parse(file: path) -> bytes: with subprocess.Popen( ['puppet', 'parser', 'dump', '--format', 'json', file], stdout=subprocess.PIPE, stderr=subprocess.PIPE) as cmd: if cmd.retuncode and cmd.returncode != 0: raise PuppetParseError(cmd.returncode, cmd.stderr.read().decode('UTF-8')) json_data = cmd.stdout.read() if (value := cmd.wait()) != 0: raise PuppetParseError(value, cmd.stderr.read().decode('UTF-8')) return json_data def parse_files(files: list[path]) -> Generator[model.PuppetFile]: for i, file in enumerate(files): try: stat = os.stat(file) last_modify = stat.st_mtime old_object = model.PuppetFile.query \ .where(model.PuppetFile.path == file) \ .first() if old_object and old_object.last_parse > last_modify: # file unchanged since our last parse, skip continue print(f'{i}/{len(files)}: {file}') if old_object: puppet_file = old_object else: puppet_file = model.PuppetFile(path=file) puppet_file.last_parse = time.time() # m.json = puppet_parse(file) yield puppet_file except PuppetParseError as err: # TODO cache error print('Error:', err) continue def interpret_file(json_data: dict) -> list[str]: """Find all classes in json-representation of file.""" top = json_data['^'] if top[0] == 'class': tmp = top[1]['#'] idx = tmp.index('name') return [tmp[idx + 1]] # print(tmp[idx + 1]) elif top[0] == 'block': ret_value = [] for element in top[1:]: if element['^'][0] == 'class': tmp = element['^'][1]['#'] idx = tmp.index('name') ret_value.append(tmp[idx + 1]) return ret_value else: return [] def enumerate_files(path_base, environment_name): path = os.path.join(path_base, environment.name) files = list(find(path, type='f', name='*.pp')) try: for puppet_file in parse_files(files): with open(puppet_file.path, 'rb') as f: checksum = hashlib.sha256(f.read()).hexdigest() # Returns puppet_file.path, relative to path_base puppet_file.path = os.path.relpath(puppet_file.path, path.encode('UTF-8')) # TODO does flask want the whole environment object? puppet_file.environment = environment.id puppet_file.checksum = checksum db.session.add(puppet_file) finally: db.session.commit() def run(path_base: path, environment_name: str): ### Ensure that we have oru environment environment = model.PuppetEnvironment.query.where(model.PuppetEnvironment.name == environment_name).first() if not environment: environment = model.PuppetEnvironment(name=environment_name) db.session.add(environment) # TODO does this update the environment object db.session.commit() ### Identify all puppet files, and note the base of their content # enumerate_files(path_base, environment_name) ### Find all puppet files which we haven't parsed result = db.engine.execute(text(""" SELECT f.id, f.path, f.last_parse, f.checksum, env.name FROM puppet_file f LEFT OUTER JOIN puppet_file_content c ON f.checksum = c.checksum LEFT JOIN puppet_environment env ON f.environment = env.id WHERE c.json IS NULL """)) # db_lock = Lock() threads = [] q = Queue() for (id, path, last, checksum, environment) in result: print(environment, path) # return full_path = os.path.join(path_base.encode('UTF-8'), environment.encode('UTF-8'), path) with open(full_path, 'rb') as f: current_checksum = hashlib.sha256(f.read()).hexdigest() if current_checksum != checksum: print(f'Checksum changed for {environment}/{path}') # db.engine.execute(model.PuppetFile.delete().where(model.PuppetFile.id == id)) continue thread = Thread(target=lambda checksum, full_path: (checksum, puppet_parse(full_path)), args=(checksum, full_path), name=f'{environment}/{path}') thread.start() threads.append(thread) try: # for thread in threads: # print(f'Waiting on {thread.name}') # thread.join() # print(f'{thread.name} joined') while not q.empty(): print('Getting something from queue') (checksum, item) = q.get() print(checksum) pfc = model.PuppetFileContent(checksum=checksum, json=item) db.session.add(pfc) q.task_done() finally: db.session.commit() return try: for puppet_file in model.PuppetFile.query.all(): try: class_names = interpret_file(json.loads(os.path.join(path, puppet_file.json))) for class_name in class_names: db.session.add(model.PuppetClass( class_name=class_name, comes_from=puppet_file)) except Exception as e: print(e) print(f'Failed: {puppet_file.path}') finally: db.session.commit()