HEX
Server: Apache
System: Linux box5514.bluehost.com 5.14.0-162.23.1.9991722448259.nf.el9.x86_64 #1 SMP PREEMPT_DYNAMIC Wed Jul 31 18:11:45 UTC 2024 x86_64
User: smqcdvmy (3893)
PHP: 8.3.30
Disabled: NONE
Upload Files
File: //usr/libexec/oracle-cloud-agent/ocatools/diagnostic
#!/usr/bin/env python
import grp
import tarfile
import time
import re
from json import dump
from os import environ, listdir, popen, unlink, path, stat
from pwd import getpwnam
from stat import S_IMODE

environ['TZ'] = 'UTC'


class Tar:

    def __init__(self, filename, ops):
        self.filename = filename
        self.tar = tarfile.open(self.filename, 'w:gz')
        self.ops = ops

    def __call__(self):
        cmds = {}
        checks = {}
        fname = 'cmd-result.json'
        cname = 'check-result.json'
        for operation in self.ops:
            operation(self, cmds, checks)
        dump(cmds,
             open(fname, 'w'),
             sort_keys=True,
             indent=4,
             separators=(',', ': '))
        dump(checks,
             open(cname, 'w'),
             sort_keys=True,
             indent=4,
             separators=(',', ': '))
        self.tar.add(fname)
        self.tar.add(cname)
        unlink(fname)
        unlink(cname)

    def add(self, filename):
        self.tar.add(filename)

    def __del__(self):
        self.tar.close()

    def close(self):
        del self


class Copydir:

    def __init__(self, directory):
        self.files = []
        for fname in listdir(directory):
            self.files.append(directory + '/' + fname)

    def __call__(self, tar, cmdDict, checkDict):
        for fname in self.files:
            tar.add(fname)


class Copyfile:

    def __init__(self, filename):
        self.filename = filename

    def __call__(self, tar, cmdDict, checkDict):
        if path.exists(self.filename):
            tar.add(path.realpath(self.filename))


class Cmd:

    def __init__(self, cmd, sudo=False):
        self.cmd = cmd
        if not sudo:
            self.sudo = ''
        else:
            self.sudo = '/usr/bin/sudo'

    def __call__(self, tar, cmdDict, checkDict):
        cmdDict[self.cmd] = popen(self.sudo + ' ' + self.cmd).read().strip()


class Check:

    def __init__(self, function, key, error_repr=None):
        self.function = function
        self.key = key
        self.error_repr = error_repr

    def __call__(self, tar, cmdDict, checkDict):
        try:
            checkDict[self.key] = self.function()
        except Exception as e:
            if self.error_repr is None:
                checkDict[self.key] = repr(e)
            else:
                checkDict[self.key] = self.error_repr


class Pmap:

    def __init__(self, process_name=None):
        self.process_name = process_name
        self.pids = {}
        for process in listdir('/proc'):
            if process[0] >= '0' and process[0] <= '9':
                self.pids[process] = open('/proc/%s/cmdline' % process).read()

    def __call__(self, tar, cmdDict, checkDict):
        for pid in self.pids:
            pname = self.pids[pid].split('\000')[0].split('/')[-1]
            if self.process_name == pname:
                cmd = 'pmap -x %s # pmap for %s' % (pid, pname)
                Cmd(cmd, True)(tar, cmdDict, checkDict)


class RunState(Pmap):

    def __init__(self, match, cmdSuffix='RunState'):
        Pmap.__init__(self)
        self.match = match
        self.cmdSuffix = cmdSuffix

    def __call__(self, tar, cmdDict, checkDict):
        res = ''
        for process in self.pids:
            if self.match in self.pids[process]:
                for line in open('/proc/' + process + '/status').readlines():
                    if line[0:len('State:')] == 'State:':
                        res = res + self.pids[process] + ' ' + line
        cmdDict['Internal(%s)' % self.cmdSuffix] = res


class If:

    def __init__(self,
                 cond_,
                 then_,
                 else_=lambda tar, cmdDict, checkDict: None):
        self.cond_ = cond_
        self.then_ = then_
        self.else_ = else_

    def __call__(self, tar, cmdDict, checkDict):
        if callable(self.cond_):
            if self.cond_(tar, cmdDict, checkDict):
                return self.then_(tar, cmdDict, checkDict)
        elif self.cond_:
            return self.then_(tar, cmdDict, checkDict)

        return self.else_(tar, cmdDict, checkDict)


class Progn:

    def __init__(self, *ops):
        self.ops = ops

    def __call__(self, tar, cmdDict, checkDict):
        for op in self.ops:
            op(tar, cmdDict, checkDict)


class Or:

    def __init__(self, *ops):
        self.ops = ops

    def __call__(self, tar, cmdDict, checkDict):
        for op in self.ops:
            if callable(op):
                res = op(tar, cmdDict, checkDict)
                if res:
                    return res
            else:
                if op:
                    return op

        return False


def correct_ownership_p(owner, group, directory):
    dirstat = stat(directory)
    if getpwnam(owner).pw_uid == dirstat.st_uid and getpwnam(
            group).pw_gid == dirstat.st_gid:
        return 'expected'

    return 'unexpected'


def group_available():
    gmax = -1
    gsmax = -1
    gmin = 123456789
    gsmin = 123456789
    for line in open('/etc/login.defs').readlines():
        line = line.strip().split()
        if len(line) == 2:
            if line[0] == 'SYS_GID_MAX':
                if int(line[1]) > gsmax:
                    gsmax = int(line[1])
            if line[0] == 'GID_MAX':
                if int(line[1]) > gmax:
                    gmax = int(line[1])
            if line[0] == 'SYS_GID_MIN':
                if int(line[1]) < gsmin:
                    gsmin = int(line[1])
            if line[0] == 'GID_MIN':
                if int(line[1]) < gmin:
                    gmin = int(line[1])

    avail = False

    if gmin == 123456789:
        gmin = 1000
    if gmax == -1:
        gmax = 60000
    if gsmax == -1:
        gsmax = gmin - 1
    if gsmin == 123456789:
        gsmin = 101

    if gsmax <= gsmin:
        return 'unexpected(login.defs configured incorrectly)'
    for gid in range(gsmax, gsmin - 1, -1):
        try:
            grp.getgrgid(gid)
        except:
            avail = True
            break

    if avail:
        return 'expected'

    return 'unexpected(all GIDs used)'


def clock_skew():
    with open('/var/log/oracle-cloud-agent/agent.log', 'r') as f:
        for line in f:
            if 'is not within allowed clock skew' in line:
                return 'detected'
    return 'not detected'


def check_output(cmd, expected_output):
    return "expected" if popen(
        cmd).read().strip() == expected_output.strip() else "unexpected"


def check_prelinking():
    cmd = 'rpm -q prelink'
    prelink_output = popen(cmd).read().strip()
    if re.search(r'^prelink-.*', prelink_output):
        return 'Detected and config file is error free' if check_prelink_config(
        ) else 'Detected and config file have errors'
    return 'not detected'


def check_prelink_config():
    filename = '/etc/prelink.conf.d/oracle-cloud-agent-prelink.conf'
    try:
        file = open(filename, 'r')
        prelink_config = set(file.read().splitlines())
        if '-b /usr/lib{,64}/oracle-cloud-agent' in prelink_config and '-b /usr/libexec{,64}/oracle-cloud-agent' in prelink_config:
            return True
        return False
    except:
        return False


def check_cmd_output(cmd, string):
    cmd_result = popen(cmd).read().strip()
    return True if cmd_result == string else False


operations = [
    Copydir('/var/log/oracle-cloud-agent'),
    Cmd('uname -a'),
    Cmd('uptime'),
    Cmd('sestatus'),
    Cmd('sysctl crypto.fips_enabled'),
    Copyfile('/etc/system-release'),
    Copyfile('/etc/oracle-cloud-agent/agent.yml'),
    Copyfile('/etc/oracle-cloud-agent/updater.yml'),
    Copyfile('/var/lib/oracle-cloud-agent/pool2/install.log'),
    Cmd('rpm -qi oracle-cloud-agent'),
    Cmd('rpm -V oracle-cloud-agent'),

    # Check the service status. Connect stderr to stdout so that OL6 users
    # don't see the warning message for systemctl not found.
    Cmd('systemctl is-enabled oracle-cloud-agent-updater.service 2>&1'),
    Cmd('systemctl is-active oracle-cloud-agent-updater.service 2>&1'),
    Cmd('systemctl is-enabled oracle-cloud-agent.service 2>&1'),
    Cmd('systemctl is-active oracle-cloud-agent.service 2>&1'),
    Cmd('ps auxww'),
    Cmd('visudo -c 2>&1', True),
    Cmd('grep oracle-cloud-agent /etc/passwd'),
    Cmd('grep adm /etc/passwd'),
    Cmd('tail -100 /var/log/oracle-cloud-agent/agent.log'),
    Cmd('tail -100 /var/log/oracle-cloud-agent/updater.log'),
    Cmd('curl -s -H "Authorization: Bearer Oracle" http://169.254.169.254/opc/v1/instance/'
       ),
    Check(
        lambda: "expected"
        if S_IMODE(stat('/var/log').st_mode) == 0o755 else "unexpected",
        '/var/log directory permissions'),
    Check(
        lambda: "expected"
        if S_IMODE(stat('/var/log/oracle-cloud-agent').st_mode) == 0o2750 else
        "unexpected", '/var/log/oracle-cloud-agent directory permissions'),
    Check(
        lambda: "expected"
        if S_IMODE(stat('/var/log/oracle-cloud-agent/agent.log').st_mode
                  ) == 0o644 else 'unexpected',
        '/var/log/oracle-cloud-agent/agent.log permissions'),
    Check(
        lambda: "expected"
        if S_IMODE(stat('/var/log/oracle-cloud-agent/updater.log').st_mode
                  ) == 0o644 else 'unexpected',
        '/var/log/oracle-cloud-agent/updater.log permissions'),
    Check(lambda: correct_ownership_p('root', 'root', '/var/log'),
          '/var/log directory ownership'),
    Check(
        lambda: correct_ownership_p('oracle-cloud-agent', 'adm',
                                    '/var/log/oracle-cloud-agent'),
        '/var/log/oracle-cloud-agent directory ownership'),
    Check(
        lambda: correct_ownership_p('oracle-cloud-agent', 'adm',
                                    '/var/log/oracle-cloud-agent/agent.log'),
        '/var/log/oracle-cloud-agent/agent.log file ownership'),
    Check(
        lambda: correct_ownership_p('oracle-cloud-agent-updater', 'adm',
                                    '/var/log/oracle-cloud-agent/updater.log'),
        '/var/log/oracle-cloud-agent/updater.log file ownership'),
    Check(lambda: getpwnam('oracle-cloud-agent') and "exists",
          'oracle-cloud-agent user', 'missing'),
    Check(lambda: getpwnam('adm') and "exists", 'adm user', 'missing'),
    Check(group_available, 'checking GID availability'),
    Check(clock_skew, 'checking clock skew'),
    Check(check_prelinking, 'checking prelinking'),
    If(
        Or(lambda tar, cmds, checks: len(cmds['uname -a'].split('el6uek')) == 2,
           lambda tar, cmds, checks: len(cmds['uname -a'].split('.el6.')) == 2),
        Progn(
            Check(
                lambda: "ok"
                if (stat('/var/log/btmp').st_size % 384 == 0) else "corrupt",
                'btmp size'), Copyfile('/etc/init/oracle-cloud-agent.conf'),
            Copyfile('/etc/init/oracle-cloud-agent-updater.conf'))),
    Pmap('oracle-cloud-agent'),
    Pmap('oracle-cloud-agent-updater'),
    Copyfile('/etc/multipath.conf'),
    RunState('oracle-cloud-agent')
]

if __name__ == '__main__':
    outfile = time.strftime('oca-diag-%Y-%m-%d.%H-%M-%S.tar.gz',
                            time.localtime())
    tar_file = Tar(outfile, operations)
    tar_file()
    del tar_file
    print('Diagnostics collected in ' + outfile)