流程的中断与回复

2026年1月13日 · 499 字 · 3 分钟

关于agent在流程中的中断,以及回复的内容

流程的中断与回复

在上一节中,假如我们禁用掉获取位置的工具,那么大模型将没法知道你的位置,这时候,如果让大模型中断,询问你的位置,用户输入位置之后,流程再回复,大模型不就可以继续工作了

中断与回复的工具

package mytool

import (
	"context"
	"log"

	"github.com/cloudwego/eino/components/tool"
	"github.com/cloudwego/eino/components/tool/utils"
	"github.com/cloudwego/eino/compose"
)

const AskForClarificationToolName = "ask_for_clarification"

/*
支持中断的Tool。当用户未提供必要信息时,向用户问清楚。【固定写法】
*/
type askForClarificationOptions struct {
	NewInput *string
}

func WithNewInput(input string) tool.Option {
	return tool.WrapImplSpecificOptFn(func(t *askForClarificationOptions) {
		t.NewInput = &input
	})
}

type AskForClarificationInput struct {
	Question string `json:"question" jsonschema:"description=为了获得必要的缺失信息,你需要向用户询问的问题"`
}

// NewAskForClarificationTool 构造支持中断的Tool
func NewAskForClarificationTool() tool.InvokableTool {
	t, err := utils.InferOptionableTool(
		AskForClarificationToolName,
		"当用户的请求含糊不清或缺乏继续所需的信息时,调用此工具。在你能有效地使用其他工具之前,用它来问一个后续问题,以获得你需要的细节。",
		func(ctx context.Context, input *AskForClarificationInput, opts ...tool.Option) (output string, err error) {
			o := tool.GetImplSpecificOptions[askForClarificationOptions](nil, opts...)
			if o.NewInput == nil {
				return "", compose.Interrupt(ctx, input.Question)
			}
			return *o.NewInput, nil
		})
	if err != nil {
		log.Fatal(err)
	}
	return t
}

缓存会话

最好使用redis

import (
	"context"

	"github.com/cloudwego/eino/compose"
)

/*
创建一个类,实现接口
	type CheckPointStore interface {
		Get(ctx context.Context, checkPointID string) ([]byte, bool, error)
		Set(ctx context.Context, checkPointID string, checkPoint []byte) error
	}
*/

type InMemoryStore struct {
	mem map[string][]byte //  基于本地内存 存储KV,在生产环境中最好基于Redis来存储KV,因为一旦go进程挂了,数据就丢失了
}

func (i *InMemoryStore) Set(ctx context.Context, key string, value []byte) error {
	// log.Printf("store %s", key)
	i.mem[key] = value
	return nil
}

func (i *InMemoryStore) Get(ctx context.Context, key string) ([]byte, bool, error) {
	// log.Printf("read %s", key)
	v, ok := i.mem[key]
	return v, ok, nil
}

func NewInMemoryStore() compose.CheckPointStore {
	return &InMemoryStore{
		mem: make(map[string][]byte),
	}
}

调用工具

func InterruptResume() {
	ctx := context.Background()

	//创建model
	model := mytool.CreateDeepseekModel()
	//创建工具集
	tools := []tool.BaseTool{
		//时间
		mytool.CreateTimeTool(),
		//天气
		mytool.CreateWeatherTool(),
		//中断工具
		mytool.NewAskForClarificationTool(),
		//位置
		//mytool.CreateLocationTool(),
	}
	//创建agent
	agent, err := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{
		Name:        "activity_agent",
		Description: "活动策划大师",
		Instruction: "请根据未来天气安排活动", //一个agent可能是另一个agent的subagent,所以agent的功能需要描绘清楚
		Model:       model,
		ToolsConfig: adk.ToolsConfig{
			ToolsNodeConfig: compose.ToolsNodeConfig{
				Tools: tools,
			},
		},
	})
	if err != nil {
		panic(err)
	}
	//运行agent
	runner := adk.NewRunner(ctx, adk.RunnerConfig{
		Agent:           agent,
		CheckPointStore: mytool.NewInMemoryStore(),//实现中断与回复需要一个缓存来记录会话
	})
	messages := []adk.Message{
		{Role: schema.User, Content: "明天的天气适合开运动会吗"},
	}

	checkPointId := xid.New().String()// 指定使用的断点ID

	//返回一个迭代器
	iter := runner.Run(ctx, messages, adk.WithCheckPointID(checkPointId))// 指定使用的断点ID
	//循环迭代器
	var lastMsg adk.Message
LB:
	for {
		//迭代器获取事件
		event, ok := iter.Next()
		if !ok {
			break
		}
		if event.Err != nil {
			panic(event.Err)
		}
		msg, err := event.Output.MessageOutput.GetMessage()
		if err != nil {
			panic(err)
		}
		fmt.Printf("%s:%s\n", msg.Role, msg.Content)
		lastMsg = msg
        //遍历使用的工具,查看是否调用中断回复工具
		if len(msg.ToolCalls) > 0 {
			for _, toolCall := range msg.ToolCalls {
				if toolCall.Function.Name == mytool.AskForClarificationToolName {
					var req mytool.AskForClarificationInput
					if err := sonic.UnmarshalString(toolCall.Function.Arguments, &req); err == nil {
                        //先向用户展示问题
						fmt.Println(req.Question)
						// 从终端获取用户的输入
						scanner := bufio.NewScanner(os.Stdin)
						fmt.Println("请在此输入答案:")
						scanner.Scan()
						fmt.Println()
						input := scanner.Text()

						// 从断点处恢复
						iter, err = runner.Resume(ctx, checkPointId,
							// 使用用户输入的答案,继续执行Agent
							adk.WithToolOptions([]tool.Option{mytool.WithNewInput(input)}))
						if err != nil {
							log.Fatal(err)
						}
						// log.Printf("resume from checkpoint %s",checkPointId)
						continue LB
					} else {
						log.Fatal(err)
					}
				}
			}
		}
	}
	fmt.Println("lastMsg:", lastMsg.Content)
}

调用结果

assistant:我需要知道您所在的城市才能为您查询明天的天气情况。请问您是在哪个城市呢?这样我就能为您查询明天的天气预报,并判断是否适合开运动会了。
请问您是在哪个城市呢?这样我就能为您查询明天的天气预报,并判断是否适合开运动会了。
请在此输入答案:
鄂尔多斯市 150600

tool:鄂尔多斯市 150600
assistant:好的,我现在为您查询鄂尔多斯市明天的天气预报,然后分析是否适合开运动会。
tool:天气预报: 白天天气: 扬沙, 夜晚天气: 晴, 白天温度: 2℃, 夜晚温度: -5℃, 白天风向: 西北, 夜晚风向: 西北, 白天风力: 1-3, 夜晚风力: 1-3
assistant:根据鄂尔多斯市明天的天气预报,我来分析是否适合开运动会:

**明天天气情况:**
- 白天天气:扬沙
- 夜晚天气:晴
- 白天温度:2℃
- 夜晚温度:-5℃
- 风向:西北风
- 风力:1-3级

**是否适合开运动会的分析:**

**不建议开运动会的原因:**
1. **扬沙天气**:这是最主要的问题。扬沙天气会导致:
   - 空气质量差,能见度降低
   - 沙尘可能影响运动员的呼吸和视线
   - 场地和设备容易积沙
   - 观众和工作人员也会感到不适

2. **温度较低**:白天只有2℃,对于户外运动会来说偏冷,运动员和观众都可能感到寒冷不适。

**建议:**
考虑到扬沙天气对户外活动的严重影响,**不建议明天在鄂尔多斯市开运动会**。扬沙不仅会影响比赛质量,还可能对参与者的健康造成影响。

如果您必须举办运动会,建议:
1. 延期到天气条件更好的日子
2. 或者考虑在室内场馆举办

您希望我查询后天或大后天的天气情况,看看是否有更适合的日子吗?
lastMsg: 根据鄂尔多斯市明天的天气预报,我来分析是否适合开运动会:

**明天天气情况:**
- 白天天气:扬沙
- 夜晚天气:晴
- 白天温度:2℃
- 夜晚温度:-5℃
- 风向:西北风
- 风力:1-3级

**是否适合开运动会的分析:**

**不建议开运动会的原因:**
1. **扬沙天气**:这是最主要的问题。扬沙天气会导致:
   - 空气质量差,能见度降低
   - 沙尘可能影响运动员的呼吸和视线
   - 场地和设备容易积沙
   - 观众和工作人员也会感到不适

2. **温度较低**:白天只有2℃,对于户外运动会来说偏冷,运动员和观众都可能感到寒冷不适。

**建议:**
考虑到扬沙天气对户外活动的严重影响,**不建议明天在鄂尔多斯市开运动会**。扬沙不仅会影响比赛质量,还可能对参与者的健康造成影响。

如果您必须举办运动会,建议:
1. 延期到天气条件更好的日子
2. 或者考虑在室内场馆举办