LINC-BIT's picture
Upload 1912 files
b84549f verified
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import os
from .base_channel import BaseChannel
command_path = "./commands"
runner_commands_file_name_prefix = "runner_commands"
manager_commands_file_name = "manager_commands.txt"
class FileChannel(BaseChannel):
def __init__(self, args):
self.node_id = args.node_id
self.out_file = None
self.in_file = None
self.in_offset = 0
self.in_cache = b""
super(FileChannel, self).__init__(args)
def _inner_open(self):
pass
def _inner_close(self):
if self.out_file is not None:
self.out_file.close()
self.out_file = None
if self.in_file is not None:
self.in_file.close()
self.in_file = None
def _inner_send(self, message):
if self.out_file is None:
if not os.path.exists(command_path):
os.makedirs(command_path, exist_ok=True)
if self.node_id is None:
file_name = os.path.join(command_path, "%s.txt" % runner_commands_file_name_prefix)
else:
file_name = os.path.join(command_path, "%s_%s.txt" % (
runner_commands_file_name_prefix, self.node_id))
self.out_file = open(file_name, "ab")
self.out_file.write(message)
self.out_file.write(b'\n')
self.out_file.flush()
def _open_manager_command(self):
full_name = os.path.join(command_path, manager_commands_file_name)
if self.in_file is not None and self.in_file.closed:
self.in_file = None
if self.in_file is None and os.path.exists(full_name):
self.in_file = open(full_name, "rb")
self.in_file.seek(self.in_offset)
def _inner_receive(self):
messages = []
if self.in_file is None:
self._open_manager_command()
if self.in_file is not None:
self.in_file.seek(0, os.SEEK_END)
new_offset = self.in_file.tell()
self.in_file.seek(self.in_offset, os.SEEK_SET)
count = new_offset - self.in_offset
if count > 0:
self.in_cache += self.in_file.read(count)
self.in_offset = new_offset
messages, self.in_cache = self._fetch_message(self.in_cache, True)
return messages