著者:しょっさん
自動的にホストが登録される部分までのプログラムは、これまでずいぶんとリファクタリングを繰り返しながら作ってきました。今回は、残りのサーバによる定期監視部分です。以前、「TaskControl」クラスを作って、将来の実装に備えました。そのTaskControlクラスの改修と、それを制御するためのメインとなる部分を準備します。
従って、今回のプログラミング作業は次の3点です。
1. 全体を制御するメインルーチンの改修
2. 死活を監視するためのTaskControlクラスの完成
3. サーバ側に合わせたエージェント側の改修
記事本文掲載のシェルスクリプトマガジンvol.53は以下のリンク先でご購入できます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# coding: utf-8 # filename: lib/reins.rb require "reins/version" require "reins/dispatcher" require "reins/auth_service" require "reins/host_registry" require "reins/task_control" require "reins/config" require "json" require "socket" require "ipaddr" require 'thwait' module Reins class << self # 指定されたポートでサーバを起動する # == パラメータ # port:: 割り当てるポート番号 # == 返り値 # Exception:: サーバが起動できなかった場合、例外を発生させ起動させない # TCPSocket:: 正常に起動した場合、Socket オブジェクトを返す def run_server(port) Reins.logger.info("Reins #{VERSION} を #{port} で起動します") TCPServer.new(port) rescue => e Reins.logger.error("Reins が起動できませんでした: #{e}") puts(e.to_s) exit end # 起動されているサーバを停止する # == パラメータ # server:: 起動中のサーバの Socket # == 返り値 # 特になし def exit_server(server) Reins.logger.info("Reins #{VERSION} を終了します") Reins.regist_host.store server.close exit end # ブロックとして定義されたコードをスレッド化して実行する # == パラメータ # ブロック:: スレッドとして実行したいコード # == 返り値 # Thread:: 生成されたスレッド def threaded Thread.new do loop do yield end end end # クライアントからの接続を待ち受ける # == パラメータ # TCPSocket:: Socket オブジェクト # == 返り値 # Thread:: 生成されたスレッド def connect_client(server) Thread.start(server.accept) do |c| client = Reins::Clients.new(c) status = {} status["keycode"] = client.keycode status["result"] = client.command == 'auth' ? client.run_auth : client.run_command c.puts JSON.pretty_generate(status) c.close end end # クライアントとの定義されたタスクを5秒間隔で実行するよう制御 # == パラメータ # 特になし # == 返り値 # 特になし def check_client threads = [] Reins.regist_host.read_hosts.each do |host| threads << Thread.new do task = TaskControl.new(host) task.check_agent end end ThreadsWait.all_waits(*threads) sleep 5 end end class Clients attr_reader :addr, :keycode, :command, :options def initialize(client) @message = JSON.parse(client.gets) Reins.logger.debug(@message) @addr = IPAddr.new(client.peeraddr[3]).native.to_s @keycode = @message["keycode"] @command = @message["command"] @options = @message["options"] end # 認証処理を行う # == パラメータ # 特になし # == 返り値 # key:: 認証が成功した場合は接続用の認証キーを返す # false:: 認証が失敗した時は "false" 文字列を返す def run_auth Reins.logger.debug("#{addr} : 認証を行います") if (key = Reins.auth_service.authenticate_key(keycode, addr)) key else "false" end end # サーバで実行されるコマンドを受け渡す # == パラメータ # 特になし # == 返り値 # false:: 実行できなければ "false" 文字列を返す # false以外:: 実行された結果を、改行を含む文字列で返す def run_command Reins::Dispatch.new(addr, keycode).command(command, options) end end def start server = run_server(Reins.port) clients_thread = threaded {connect_client(server)} tasks_thread = threaded {check_client} tasks_thread.join clients_thread.join rescue Interrupt exit_server(server) end module_function :start end |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# filename: task_control.rb module Reins class TaskControl # クライアント定義 # == パラメータ # hostname:: 接続先クライアントのホスト名、またはIPアドレス # port:: 接続先クライアントのTCPポート番号。通常は 24368 # == 返り値 # 特になし def initialize(hostname = '127.0.0.1', port = 24_368) @addr = hostname @port = port @keycode = Reins.regist_host.read_hostkeys[@addr] end # クライアントとの接続 # == パラメータ # 特になし # == 返り値 # TCPScoket:: TCPのソケット情報 def connection TCPSocket.open(@addr, @port) rescue => e Reins.logger.error("#{e}: クライアントへの接続でエラーが発生しました") dead end # クライアントとの死活確認 # == パラメータ # 特になし # == 返り値 # true:: 生存確認 # false:: クライアントが停止、またはネットワーク上に問題あり def viability(s) s.puts(JSON.generate("keycode" => @keycode.to_s, "command" => "watch")) raise unless s.gets.chomp == "OK" alive rescue => e Reins.logger.error("#{e}: クライアントへの接続でエラーが発生しました") dead end # ステータスコードを「alive」に変更 # == パラメータ # 特になし # == 返り値 # true:: ステータスの変更に成功 # false:: ステータスの変更に失敗 def alive Reins.regist_host.set_status(@addr, @keycode, "alive") if Reins.regist_host.get_status(@addr, @keycode) == "dead" end # ステータスコードを「dead」に変更 # == パラメータ # 特になし # == 返り値 # true:: ステータスの変更に成功 # false:: ステータスの変更に失敗 def dead Reins.regist_host.set_status(@addr, @keycode, "dead") if Reins.regist_host.get_status(@addr, @keycode) == "alive" end # クライアントエージェントとの応答確認 # == パラメータ # 特になし # == 返り値 # 特になし def check_agent Reins.logger.debug("#{@addr} 宛にチェックを行います...") viability(connection) close end # クライアントとの接続を切断 # == パラメータ # 特になし # == 返り値 # nil:: 正常に切断 def disconnect @s.close end end end |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# filename: reins_agent.rb require "reins_agent/config" require "reins_agent/version" require "json" require "ipaddr" require "socket" module ReinsAgent class << self # サーバへ接続した後、指定したポート番号でサービスを開放する # == パラメータ # port:: エージェントが起動するためのポート番号 # == 返り値 # agent:: 起動されたサービスのソケット情報(@agent でも参照可) def run_agent(port) ReinsAgent.logger.info("Reins Agent #{VERSION} を #{port} で起動します") auth_command = JSON.generate("command" => "auth", "keycode" => ReinsAgent.client_key.to_s) throw unless (status = JSON.parse(ReinsAgent.exec(auth_command))) @cert_key = status["result"] ReinsAgent.logger.debug("認証キー : #{@cert_key} で認証が完了しました") @agent = TCPServer.new(ReinsAgent.client_port) rescue => e ReinsAgent.logger.fatal("Reins Agent が起動できませんでした: #{e}") exit end # サーバからのデータをパースする # == パラメータ # r:: リモートサーバのソケット情報 # == 返り値 # 特になし、ただし @message 連想配列へ、取得した情報を取得 def define_value(r) @message = JSON.parse(r.gets) @message["IP address"] = IPAddr.new(r.peeraddr[3]).native.to_s ReinsAgent.logger.debug("addr = #{@message['IP address']}, keycode = #{@message['keycode']}, command = #{@message['command']}, options = #{@message['options']}") end # ブロックとして定義されたコードをスレッド化して実行する # == パラメータ # ブロック:: スレッドとして実行したいコード # == 返り値 # Thread:: 生成されたスレッド def threaded Thread.new do loop do yield end end end # エージェントの終了処理 # == パラメータ # 特になし # == 返り値 # 常に終了 def exit_agent ReinsAgent.logger.info("Reins Agent #{VERSION} を終了します") delete_command = JSON.generate("command" => "delete", "keycode" => @cert_key.to_s) ReinsAgent.exec(delete_command) @agent.close exit end # サーバへコマンドを送信し、その返り値を取得する # == パラメータ # command:: サーバへ送信するコマンド+オプションを指定 # == 返り値 # response:: サーバからの返り値(改行付き/複数行) def exec(command) ReinsAgent.logger.debug("コマンドを実行します : #{command}") s = TCPSocket.open(ReinsAgent.server_host, ReinsAgent.server_port) s.puts command response = s.read s.close if s response end # サーバからの接続用常駐エージェントを起動 # == パラメータ # 特になし # == 返り値 # 特になし def connect_agent Thread.start(@agent.accept) do |r| define_value(r) r.puts("OK") r.close end end end # def start run_agent(ReinsAgent.client_port) list_command = JSON.generate("command" => "list", "keycode" => @cert_key.to_s) puts ReinsAgent.exec(list_command) server_thread = threaded {connect_agent} server_thread.join rescue Interrupt exit_agent end module_function :start end |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
#!/usr/bin/env ruby # ThreadsWait が使いたくて require 'thwait' # スレッドの実行前に、ひとまず "100" と定義しておく a = 100 threads = [] puts "実行前-親スレッド = #{a}" # 5つのスレッドを起動し、各々で、変数"a"の状態を変更してみる 5.times { |i| threads << Thread.new do a = i puts "子スレッド = #{a}" end } # 全てのスレッドの終了を待つ... ThreadsWait.all_waits(*threads) # さて、実行後、変数"a" はどうなったか? puts "実行後-親スレッド = #{a}" |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
#!/usr/bin/env ruby # 実行前に親プロセスとなる、自分のプロセスIDを保管しておく a = Process.pid puts "実行前-親プロセス = #{a}" # 子プロセスを5つ起動し、親プロセスから引き継いだ、変数"a"の内容を、子プロセスのプロセスIDで上書きしてみる 5.times do fork do a = Process.pid puts "子プロセス = #{a}" end end # 5つの子プロセスの終了を待つ 5.times do Process.wait end # さて、実行後、変数"a" はどうなったか? puts "実行後-親プロセス = #{a}" |