Source code for AFL.automation.APIServer.CAStatusPublisher

import os
import json
import threading
from caproto.server import PVGroup, pvproperty, run

class QueueStatusGroup(PVGroup):
    """PVGroup publishing queue status via EPICS Channel Access."""
    queue_state = pvproperty(value='Ready', dtype=str)
    queue_json = pvproperty(value='[]', dtype=str)
    running_task = pvproperty(value='[]', dtype=str)
    driver_status = pvproperty(value='{}', dtype=str)

    def __init__(self, queue_daemon, **kwargs):
        self.queue_daemon = queue_daemon
        super().__init__(**kwargs)

    @queue_state.scan(period=1.0)
    async def queue_state(self, instance, async_lib):
        if self.queue_daemon.paused:
            state = 'Paused'
        elif self.queue_daemon.debug:
            state = 'Debug'
        elif self.queue_daemon.busy:
            state = 'Active'
        else:
            state = 'Ready'
        await instance.write(state)

    @queue_json.scan(period=1.0)
    async def queue_json(self, instance, async_lib):
        try:
            queue_items = list(self.queue_daemon.task_queue.queue)
            await instance.write(json.dumps(queue_items))
        except Exception:
            await instance.write('[]')

    @running_task.scan(period=1.0)
    async def running_task(self, instance, async_lib):
        try:
            await instance.write(json.dumps(self.queue_daemon.running_task))
        except Exception:
            await instance.write('[]')

    @driver_status.scan(period=1.0)
    async def driver_status(self, instance, async_lib):
        try:
            status = self.queue_daemon.driver.status()
            await instance.write(json.dumps(status))
        except Exception:
            await instance.write('{}')


[docs] class CAStatusPublisher(threading.Thread): """Thread running a caproto server to publish queue status PVs."""
[docs] def __init__(self, queue_daemon, prefix='AFL:', port=5064, interfaces=None): super().__init__(daemon=True) self.queue_daemon = queue_daemon self.prefix = prefix self.port = port self.interfaces = interfaces or ['0.0.0.0']
[docs] def run(self): os.environ['EPICS_CA_SERVER_PORT'] = str(self.port) ioc = QueueStatusGroup(self.queue_daemon, prefix=self.prefix) run( ioc.pvdb, interfaces=self.interfaces, log_pv_names=False, )