LangChain の LLM Streaming を Web と組み合わせたり制御したり

FacebooktwitterredditpinterestlinkedinmailFacebooktwitterredditpinterestlinkedinmail

前回:https://ict-worker.com/ai/langchain-stream.html

前回は、 LangChain を使って、 LLM からの出力を Streaming ≒ Token 単位での出力を標準出力で行いました。また、標準出力以外で出力を行うためにはカスタムクラスを用いる必要があることを学習しました。

今回は、実践編として、 (大分簡易的ですが) ChatGPT のように Token 単位で Web で受信する方法と、Text 2 Speech ソフトや、 VOICEVOX のような音声合成 AI との連携を考慮して、 Streaming を制御する基本をやってみたいと思います。

Web に Streaming してみる

LangChain の Python 版を使いたいので、サーバーサイドには Flask を用います。 LangChain には JavaScript (node.js)版もあるみたいなので、場合によってはそちらの方が楽かもしれません。フロントエンドは、通常の HTML + JavaScript で行います。

サーバーサイドのコーディング

とりあえず import から。 LLM ラッパー系、 Streaming を制御するための CallbackManager & CallbackHandler 系、 Flask と Ajax 制御のために Socket.io 系です。

from gevent import monkey
monkey.patch_all()
from langchain.chat_models import ChatOpenAI
from langchain.callbacks.base import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from typing import Any, Dict, List, Optional, Union
from langchain.callbacks.base import CallbackManager, BaseCallbackHandler
from langchain.schema import AgentAction, AgentFinish, LLMResult
from langchain.schema import HumanMessage
from flask import Flask, render_template, request
from flask_socketio import SocketIO, send, emit, join_room


import key, os

カスタム CallbackHandler

大体、前回と一緒なので、一部抜粋にします。

class MyCustomCallbackHandler(BaseCallbackHandler):
    # constructor
    streaming_handler = None
    def __init__(self, cbfunc):
        self.streaming_handler = cbfunc

    def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        '''新しいtokenが来たらprintする'''
        print(token)
        self.streaming_handler(token)

抜粋したところ以外も abstract なので、実装が必須な点に注意が必要です。

変えた点としては、コンストラクタに関数ポインタ(関数名に()をつけないで入れる)を受け取るようにして、 on_llm_new_token メソッドで token のみを入れて呼び出すようにしています。

on_llm_new_token で直接 token に対する処理を記述してもいいのですが、ちょっとの違いでも毎回 Class 定義から書き換えないといけない(または継承しないといけない)のが面倒なのでこのようにしています。

また、実用としては他の on_llm_* についても関数ポインタを呼び出すようにして、Noneであれば何もしないという実装がお手軽でしょう(デバッグしやすいかどうかはともかく)。

LLM の作成と Streaming Handler の実体

human_message = ''
ai_message = ''

def handle_token(token):
    global ai_message
    ai_message = ai_message + token
    socketio.emit('token', {'token': token}, room='gpt')
os.environ["OPENAI_API_KEY"] = key.OPEN_API_KEY

llm = ChatOpenAI(streaming=True, callback_manager=CallbackManager([MyCustomCallbackHandler(handle_token)]), verbose=True, temperature=0)

handle_token が実際に token を受け取って処理する関数とします(そのため、今回のプログラムでは llm の predict メソッドなどの返り値は重要ではありません)。

socketio (後ほど実体化します) の emit メソッドを使って、 gpt room に対して、 token を送信しています。LLM の token を受けて、かつ、 Ajax の非同期通信部分を担う関数です。

LLM ラッパーである ChatOpenAI では streaming スイッチの True をし、 CallbackManager のリストには、カスタマイズした CallbackHandler のコンストラクタを渡しています。ここの前回との違いは、 handle_token 関数のポインタが渡されている点です。

Flask と SocketIO でサーバーサイド実装

app = Flask(__name__)
app.config['SECRET_KEY'] = 'my_secret'
socketio = SocketIO(app, cors_allowed_origins='*', async_mode='gevent')

@app.route('/')
def index():
    return render_template('index.html')

# send human_message and ai_message when user connect to the server
@socketio.on('connect')
def connect(auth):
    global human_message
    global ai_message
    print('Client connected')
    join_room('gpt')
    #emit('ai_message', {'ai_message': ai_message})

# update textarea s when user input new message
@socketio.on('human_message')
def handle_human_message(text):
    global human_message
    global llm
    human_message = text
    llm(messages=[HumanMessage(content=human_message)])

if __name__ == '__main__':
    print('run')
    socketio.run(app, debug=False)

英語でコメント書いているのは、CodeWhisperer で楽をしようとしているからですね 🙂 日本語の扱いがかなり苦手なので、お金払って商用で使うほどではないかなと思います。

閑話休題。

デフォルトの localhost:5000 にアクセスしたときには、 index 関数により templates/index.html ファイルを返します(特別、サーバーサイドのレンダリングなどは行いません)。同時に、 connect 関数により、 gpt という room に参加させます。 ai_message に LLM からの過去の出力を登録しておく場合には、ここで記録を emit してもいいでしょう。

ユーザーからの入力は、 human_message  イベントにより受け取ります。処理としては、テキストを llm に渡すだけです。今回は一人で接続するテストなのでいいですが、複数人で同じサーバーに接続する場合にはこれだと多分すぐに破綻しますので、気をつけてください(とりあえずグローバル変数は減らすのと、 LLM の動作をうまく排他制御するかユーザーごとにするかしないと容易にコンフリクトすると思います)。

JavaScript でクライアントサイドの Ajax 実装

<html>
  <head>
    <title>Streaming</title>
  </head>
  <body>
    <textarea id="ai" rows="10" cols="60"></textarea>
    <input id="human" type="text">
    <input id="sendButton" type="submit">
    <script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.js" integrity="sha512-q/dWJ3kcmjBLU4Qc47E4A9kTB4m3wuTY7vkFJDTZKjTs8jhyGQnaUrxa0Ytd0ssMZhbNua9hE+E7Qv1j+DyZwA==" crossorigin="anonymous"></script>
    <script type="text/javascript" charset="utf-8">
      var socket = io();
      socket.on('connect', function() {
          socket.emit('my event', {data: 'I\'m connected!'});
      });
      var output = '';


      // テキストエリアの更新
      socket.on('token', function(msg) {
        output += msg.token;
        document.getElementById('ai').textContent = output;
      })


      document.getElementById('sendButton').addEventListener('click', ()=> {
        let humanMessage = document.getElementById('human').value;


        if(humanMessage != ''){
            socket.emit('human_message', humanMessage);
        }
        document.getElementById('human').value = '';
        output = '';
      })
    </script>
  </body>
</html>

HTML が読み込まれたら、まずは SocketIO をインスタンス化し、 connect で接続し gpt room に join します。

後は、input にテキストを入力した状態で送信ボタンをクリックしたら、human_message イベントとしてサーバーに入力情報を送信し、 token イベントを待ち受けて送られてきたテキスト情報(token)を textarea に追記します。

動作させると、非常に不格好ですし、 memory も実装されていませんが ChatGPT のように日本語であれば大体1文字ずつ、英語であれば単語ごとに出力されることが確認できます。

メモリの実装については以下:

ConversationSummaryBufferMemory で SlackBOT を ChatGPT っぽく

Streaming を少し実用的に考えて見る

ChatGPT 風の1文字ずつ出力もいいのですが、例えば Slack などのチャットツールでは不便極まりないですし、 HTML でもちょっと鬱陶しいです。かなり小分けなので、ネットワーク的な負荷も、オーバーヘッド分無駄がありそうです。

ということで、チャットツールでもギリギリ許されそうな送信頻度である 1行ずつの送信に挑戦してみます。また、1行ずつであれば冒頭で触れた通り、Text2Speech系のツールとも相性がいいです。1文字ずつ入力して音声合成していたら、日本語では確実に読み上げが破綻してしまいます。かといって、長めの LLM の出力を待っていてはユーザーからの入力に対して、音声の出力が遅くなりすぎてしまいます。

キャラクターとの会話 AI を作りたい人や、音声受付などではこの手の調整が恐らく重要になってくると思います(句読点でも chunk として送信した方がいい場合もあるかもしれませんね)。

def handle_token(token):
    global ai_message
    ai_message = ai_message + token
    if '\n' in token:
        socketio.emit('token', {'token': ai_message}, room='gpt')
        ai_message = ''

コードとしては、 handle_token 関数で、すぐに emit するのではなく global 変数 ai_message に一回格納し、 改行コードが含まれていたら emit し、格納された変数をクリアしています。

シングルユーザーであれば、これで1行ずつテキストが出力されると思います。

ただ、ここでは LLM からの最終出力は必ず改行コードであるという前提に基づいてコーディングされています。実際には最後の改行コードが出力される前にtokenの限界が発生する場合もあるでしょう。また、改行コード以外を chunk の区切りとする場合(句読点など)もこのコードでは最後の出力が正常に処理されないでしょう。

その場合は、 llm_end メソッドでも残った内容を flush するような処理が必要になると思います。

まとめ

  • CallbackHandler に関数ポインタを渡して外側でそれぞれの Callback を処理した方が楽
  • Webサーバーなど、複数人で同時に処理が行われることが想定される場合、これまでのようなグローバル変数を多様したようなコードだと危ない(特にStreaming を使う場合競合しやすそう)
  • Streaming からの出力は、単に token 単位で処理する以外にも工夫することで、接続先に合わせた LLM の出力に加工できる
FacebooktwitterredditpinterestlinkedinmailFacebooktwitterredditpinterestlinkedinmail

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

このサイトはスパムを低減するために Akismet を使っています。コメントデータの処理方法の詳細はこちらをご覧ください

最新の記事