ハーベスト用ノードURL

symbol-node.harvest-xym.com

Symbol (XYM) 返金Pythonスクリプト (複数号機対応)

symbol tech Tech

はじめに

私は複数のSymbol ノードを運営しており、ノード毎に異なった返金額(手数料率)を設定しています。

お陰様で弊ノードも多数の委任者様にご支持いただいておりますが、返金作業の正確性を維持するには精神的にも時間的にも苦労が絶えません。

Symbolノード供給過多の現在は、多くのノードにおいて多様な還元策やイベントが展開されています。そのような状況下、他のノード運営者の方も私と同じくノード報酬の還元・返金に費やす時間が多いと思います。私も完全手作業から始まり、スクリプトによる送信対象、金額、コメントのリスト自動生成を行うなど、作業効率を上げてきました。

次なるステップとして作成したスクリプトがこの返金スクリプトです。返金リストの生成までスクリプトを組んでいたのですが、肝心の送金処理がまったくの手探りであったため、手を出せずにいました。

なお、アドホックに増築を繰り返したスクリプトですので、そんなにきれいなコードではないです。

※弊ノードではハーベストした委任者にノード報酬の一部をお渡しする行為を「返金」と表現しております。

概要

指定したウォレットアドレスで受け取ったノード報酬の一部をハーベスターに返金するスクリプトです。

  • 受け取り用ウォレットアドレスは1つのみ(送金兼用)
  • 複数ノード対応
  • 前回処理実行以降のノード報酬を処理対象にする
  • テストモード実装(自分自身に送金します・on/off選択可)
  • 対話モード実装(送金直前にキー入力を要求します・on/off選択可)
  • 期間中に複数回ハーベストした委任者は合計返金額を1トランザクションで返金する。

この記事の利用対象者

  • プログラミング経験者(経験のない方は絶対に使用しないでください)

使用上の注意

  • 当該スクリプトはウォレットの秘密鍵をファイルに保存して使用します。機微な情報が漏洩されない堅牢な環境で実行することを推奨いたします。漏洩リスクを下げるため、スクリプトを実行する都度、ファイルに設定することを推奨いたします。
  • 筆者は当該スクリプトに直接的、間接的に起因する一切の損害について責任を追うものではありません。そのまま利用するのではなく参考程度にとどめてください。

ファイル構造

auto_refund
├── conf
│   └── my_nodes.json
├── logs
│   ├── refund_detail.log
│   └── refund_joural.txt
├── refund.py
├── secure
│   └── private_key.txt
└── var
    ├── address_table.txt
    └── last_block.txt

設定項目

ノード情報 my_nodes.json

[
    {
        "nodeNo": 1
        , "fqdn": "0-0symbol-node1.trivill.com"
        , "port": 3000
    }
    ,{
        "nodeNo": 2
        , "fqdn": "0-0-0-0-0-0-0-0-0-0.quantum-zero.com"
        , "port": 3000
    }
    ,{
        "nodeNo": 3
        , "fqdn": "symbol-node.harvest-xym.com"
        , "port": 3000
    }
]
nodeNo号機。号機別の返金額計算処理内で号機判定の目的で使用します。
fqdnノードのFQDN
portノードのポート

秘密鍵 private_key.txt

機密情報ですので取り扱いには十分に注意してください。

97C41******************************************************0C0A5

処理開始ブロック latest_block.txt

初回のみ返金処理を開始するブロック(前回の最終ハーベスト対象ブロック+1、もしくは今回処理対象のブロック)を入力してください。2回目以降はスクリプトが自動で更新します。

297519

スクリプト内設定箇所

### 設定 #############################################################################

# テストネット="public_test" メインネット="public"
CURRENT_NET = "public"

# トランザクション送信に使う手数料
FEE_SIZE    = 0.1

# 本番モード="PROD" テストモード="TEST"
ENV = "PROD"

# 対話モード="Y" サイレントモード="N"
INTERACTIVE = "Y"

#####################################################################################
CURRENT_NETテストネット、メインネットのどちらを使用するか。
FEE_SIZE手数料
ENVテストモードの場合、宛先を強制的に送信元アドレスに設定します。(つまり、自分から自分に送金)
INTERACTIVE対話モードで実行すると、スクリプト実行時や各送金タイミングでキーボード入力を求めます。

スクリプト実行時、引数に silent を指定することでサイレントモードで実行することも可能です。これにより、cron での実行がしやすくなりました。

返金額計算処理

# 号機別の返金(還元)額計算
def get_reward(addr, fee):
    node_no = int(delegater[addr])
    log("  Node No.: " + str(node_no))
    if node_no == 1:
	    reward = math.floor(Decimal(fee) * Decimal(0.1) / 100000) / 10
    if node_no == 2:
        if addr == "********************":
            reward = 20
        else:
            reward = 10
    if node_no == 3:
        reward = 0

    log("  Reward: " + str(reward))
    return reward

ご自身の返金(還元)ルールに基づいた処理を組み込んでください。

この例では、1号機は10%返金、2号機は10 XYM返金(特定のアドレスは20 XYM返金)、3号機は返金なしとしています。

出力ファイル

実行ログ refund_detail.log

当該スクリプトの実行ログです。

返金履歴 refund_joural.txt

返金履歴です。実際に送金する場合は同一宛先ウォレットで合算します。

今後の課題

  • アドレス変換をスマートにやりたい。(16進数→Base32)
  • 変数のスコープが・・・
  • 完全自動化(おそらくこのまま cron に設定すれば可能なはず)

コード

コードイメージ

左がソースコード、右がログファイルのイメージです。

コード全体

import requests
import sha3
import datetime
import json
import http.client
import os
import subprocess
import math
import sys
import time
from decimal import Decimal
from binascii import unhexlify
from binascii import hexlify
from symbolchain.core.CryptoTypes import PrivateKey
from symbolchain.core.sym.KeyPair import KeyPair
from symbolchain.core.facade.SymFacade import SymFacade
from symbolchain.core.sym.MerkleHashBuilder import MerkleHashBuilder
from symbolchain.core.CryptoTypes import Hash256
from symbolchain.core.CryptoTypes import PublicKey


### 設定 #############################################################################

# テストネット="public_test" メインネット="public"
CURRENT_NET = "public"

# トランザクション送信に使う手数料
FEE_SIZE    = 0.1

# 本番モード="PROD" テストモード="TEST"
ENV = "PROD"

# 対話モード="Y" サイレントモード="N"
INTERACTIVE = "Y"

#####################################################################################

current_dir	= os.path.dirname(os.path.abspath(__file__))

ticket_file		    = current_dir + '/var/last_block.txt'      # チケット(最終処理ブロック)
log_file		    = current_dir + '/logs/refund_detail.log'  # ログ
refund_joural	    = current_dir + '/logs/refund_joural.txt'  # 送信履歴
private_key_file    = current_dir + '/secure/private_key.txt'  # 秘密鍵
my_nodes_file       = current_dir + '/conf/my_nodes.json'      # ノード情報
address_table_file  = current_dir + '/var/address_table.txt'   # アドレス変換

list_harvester = []
dict_xym = {}
dict_block = {}
list_refund_joural = []

# ログ出力
def log(msg):
    f = open(log_file, 'a')
    f.write(str(msg) + "\n")

def log_print(msg):
    log(msg)
    print(msg)

# 号機別の返金(還元)額計算
def get_reward(addr, fee):
    node_no = int(delegater[addr])
    log("  Node No.: " + str(node_no))
    if node_no == 1:
	    reward = math.floor(Decimal(fee) * Decimal(0.1) / 100000) / 10
    if node_no == 2:
        if addr == "**********":
            reward = 20
        else:
            reward = 10
    if node_no == 3:
        reward = 0

    log("  Reward: " + str(reward))
    return reward

# 次回の処理開始ブロック設定
def set_block(block):
    log("brock height for next process: " + str(block))
    f = open(ticket_file, 'w')
    f.write(block)

# 全委任者取得と号機仕訳
def get_my_delegaters():
    global delegater
    delegater = {}
    for node in my_nodes:
        api_endpoint = "http://" + node['fqdn'] + ":" + str(node['port']) + "/node/unlockedaccount"
        log("API Endpoint(get_my_delegaters #" + str(node["nodeNo"]) + "): " + api_endpoint)
        r = requests.get(api_endpoint).json()
        log("Delegater address are: ")
        for i in r['unlockedAccount']:
            if len(address_table) > 1 and i in address_table:
                a32 = address_table[i]
            else:
                api_endpoint = "http://" + node['fqdn'] + ":" + str(node['port']) + "/accounts/" + i
                log("  http://" + node['fqdn'] + ":" + str(node['port']) + "/accounts/" + i)
                r = requests.get(api_endpoint).json()
                if "account" in r:
                    a32 = str(convert_pub2a32(r['account']['supplementalPublicKeys']['linked']['publicKey']))
                else:
                    a32 = " "
            new_address_table[i] = a32
            delegater[a32] = str(node["nodeNo"])
            log("  " + str(a32) + " #" + str(node["nodeNo"]))

    set_new_address_table()

# ノード報酬取得
def get_transactions():
    api_endpoint = my_node_url + "/statements/transaction?order=asc&targetAddress=" + str(address32) + "&fromHeight=" + str(block)
    log("API Endpoint(get_transactions): " + api_endpoint)
    r = requests.get(api_endpoint).json()
    return r

# 前回の最終処理ブロック取得
def load_last_blocks():
    global block
    f = open(ticket_file, 'r')
    block = f.read()
    log("  Brock height: " + str(block))

# 秘密鍵読み込み
def load_private_key():
    global private_key
    f = open(private_key_file, 'r')
    private_key = f.read()
    log("  private key: " + str(private_key))

# ノード情報読み込み
def load_my_nodes():
    global my_nodes
    f = open(my_nodes_file, 'r')
    my_nodes = json.loads(f.read())
    log("  My Nodes: " + str(my_nodes))

# アドレステーブル読み込み
def load_address_table():
    global address_table, new_address_table
    f = open(address_table_file, 'r')
    try:
        address_table = json.load(f)
    except:
        address_table = {}
    new_address_table = {}
    log("  Address Table: " + str(address_table))

def set_new_address_table():
    log("Writing new address table to file.: " + str(new_address_table))
    f = open(address_table_file, 'w')
    f.write(json.dumps(new_address_table))

def set_journal():
    log("Writing journal.: ")
    log("  " + str(list_refund_joural))
    f = open(refund_joural, 'w')
    for i in list_refund_joural:
        f.write(str(i) + "\n")

# 初期設定処理
def init():
    global facade, start_time, mosaics_id, node_adr, address32, my_node_url, network

    log("Init start")
    load_private_key()
    load_last_blocks()
    load_my_nodes()
    load_address_table()

    if CURRENT_NET == "public_test":
        start_time = 1616694977
        mosaics_id = 0x091F837E059AE13C
        node_adr = "sym-test-01.opening-line.jp"
        network = "TEST_NET"

    elif CURRENT_NET == "public":
        start_time = 1615853185
        mosaics_id = 0x6BED913FA20223F8
        node_adr = my_nodes[0]['fqdn']
        network = "MAIN_NET"

    facade = SymFacade(CURRENT_NET)
    
    address32 = convertPri2A32(private_key)
    my_node_url = "http://" + node_adr + ":" + "3000"
    get_my_delegaters()

# 秘密鍵 to Address(Base32)変換
def convertPri2A32(pkey):
    global alice_pubkey, alice_keypair
    alicePrikey     = PrivateKey(unhexlify(private_key))
    alice_keypair   = KeyPair(alicePrikey)
    alice_pubkey    = alice_keypair.public_key
    alice_address   = facade.network.public_key_to_address(alice_pubkey)
    log(" MyAddress(Base32) : " + str(alice_address))
    return alice_address

# Hex to Address(Base32)変換
def convert_hex2a32(address_hex):
   return subprocess.check_output(['symbol-cli','converter','hexToBase32Address','-a',address_hex]).decode('utf-8').replace('\n','').replace('-','')

# Pub Key to Address(Base32)変換
def convert_pub2a32(pub_key):
   return facade.network.public_key_to_address(PublicKey(pub_key))
   #return subprocess.check_output(['symbol-cli','converter','publicKeyToAddress','-n',network,'-p',pub_key]).decode('utf-8').replace('\n','').replace('-','')

# 返金用のジャーナルを作成する
def generate_address_list(tran):
    global list_harvester, dict_xym, dict_block

    height = int(tran['statement']['height'])
    log("Transaction start (Block: " + str(height) + ")")
    harvesterHex = tran['statement']['receipts'][0]['targetAddress']
    log("  harvesterHex: " + harvesterHex)
    harvester = str(convert_hex2a32(harvesterHex))
    log("  harvester: " + harvester )
    reward = get_reward(harvester, tran['statement']['receipts'][2]['amount'])
    log("----------------------------------------------------------------------------------------------------")
    if reward == 0:
        return

    if harvester not in list_harvester:
        list_harvester = list_harvester + [harvester]

    if harvester in dict_xym:
        dict_xym[harvester] = Decimal(dict_xym[harvester]) + Decimal(reward)
    else:
        dict_xym[harvester] = reward
    
    if harvester in dict_block:
        dict_block[harvester] = dict_block[harvester] + ", " + str(height)
    else:
        dict_block[harvester] = str(height)
    return height

def generate_journal():
    global list_harvester
    for h in list_harvester:
        list_refund_joural.append([h,dict_xym[h],"Congrats on your harvesting! I give you a refund. (Block: " + str(dict_block[h]) + ")"])
    set_journal()

def refund():
    log("Create transation.:")
    global list_refund_joural
    address_tx = []
    for address in list_refund_joural:
        log("  " + str(address))

        if ENV == "PROD":
            adr = address[0]
            msg = address[2]
        else:
            adr = str(address32)
            msg = "staging: " + address[2] + "\n" + address[0]
        
        deadline = (int((datetime.datetime.today() + datetime.timedelta(hours=2)).timestamp()) - start_time) * 1000

        tx  = facade.transaction_factory.create({
            'type': 'transfer',
            'signer_public_key': alice_pubkey,
            'fee': int(FEE_SIZE * 1000000),
            'deadline': deadline,
            'recipient_address' : SymFacade.Address(adr.replace('-','')),
            'mosaics': [(mosaics_id, int(address[1] * 1000000))],
            'message': bytes(1) + msg.encode('utf8')
        })
        signature = facade.sign_transaction(alice_keypair, tx)
        tx.signature = signature.bytes

        address_tx.append(tx)
        log("  to: " + adr)
        log("  amount: " + str(address[1]))
        log("  msg: " + msg.replace('\n',''))
        print(address[0],address[1],address[2])
        if INTERACTIVE == "Y":
            a = input('Hit any key after refund [' + ENV + ']:')

        # ネットワークへアナウンス
        log("Send transation.:")
        payload = {"payload": hexlify(tx.serialize()).decode('utf8').upper()}
        jsonPayload = json.dumps(payload)
        headers = {'Content-type': 'application/json'}
        conn = http.client.HTTPConnection(node_adr, 3000)
        conn.request("PUT", "/transactions", jsonPayload, headers)
        response = conn.getresponse()
        log("  Response status: " + str(response.status))
        log("  Response reason: " + response.reason)

        # 確認
        hash = facade.hash_transaction(tx)
        log('  http://' + node_adr + ':3000/transactionStatus/' + str(hash))
        log("----------------------------------------------------------------------------------------------------")

# Main
def main():
    global INTERACTIVE
    log("━━━━━ start [" + str(datetime.datetime.now()) + "] ━━━━━━━━━━━━━━━━━━━━")
    log_print("Environment: " + ENV)
    log_print("Network: " + CURRENT_NET)

    for i in sys.argv:
        if i == "silent":
            INTERACTIVE = "N"
    
    if INTERACTIVE == "Y":
        a = input('Hit any key to start:')
    else:
        log_print("Silent mode.")
        time.sleep(5)
    init()
    trans = get_transactions()
    log("Transaction count: " + str(len(trans)))
    next_brock = block
    for tran in trans['data']:
        log(str(tran))
        next_brock = generate_address_list(tran)
    generate_journal()
    refund()
    if len(trans['data']) > 0:
        next_brock = next_brock + 1
        if ENV == "PROD":
            set_block(str(next_brock))
    else:
        log_print("No data found.")
        
    log_print("end")

if __name__ == "__main__":
    main()
タイトルとURLをコピーしました