LongLong's Blog

分享IT技术,分享生活感悟,热爱摄影,热爱航天。

使用Python进行CAN总线通信

控制器局域网总线(CAN,Controller Area Network)是一种用于实时应用的串行通讯协议总线,它可以使用双绞线来传输信号,是世界上应用最广泛的现场总线之一。CAN协议用于汽车中各种不同元件之间的通信,以此取代昂贵而笨重的配电线束。

1. CAN总线的物理结构

CAN通讯是异步通讯,只有CAN_H和CAN_L两条信号线,共同构成一组差分信号线,以差分信号的形式进行通讯。使用闭环总线网络,速度快,距离短,它的总线最大长度为40m,通讯速度最高1Mbps,总线两端各有一个120欧的电阻。

CAN总线上可以挂载多个通讯节点,节点之间的信号进过总线传输,实现节点间通讯。由于 CAN 通讯协议不对节点进行地址编码,而是对数据内容进行编码的,所以网络中的节点个数理论上不受限制,只要总线的负载足够即可,可以通过中继器增强负载。

2. CAN通信协议

CAN通信协议分为标准格式和扩展格式,在使用层上两者主要的区别在于ID的长度不同,标准帧为11位,扩展帧为29位,传输数据的部分都是0~8个字节的数据段(数据前用4位的标识数据的长度)。

3. CAN接口的配置

CAN接口在Linux系统中和以太网接口一样作为一个通信接口,可以通过ifconfig和ip命令进行配置和启动。以树莓派中常见的MCP2515为例,除了在内核中配置引导对应的模块外。可通过下列命令对CAN接口进行启用

ip link set can0 up type can bitrate 1000000
ifconfig can0 txqueuelen 65536
ifconfig can0 up

如果需要在开机时启动CAN接口,可以将相关配置放在rc.local或udev中。

KERNEL=="can*"
SUBSYSTEM=="net"
ACTION="add"
PROGRAM="/sbin/ifconfig $name txqueuelen 1024"
RUN+="/usr/bin/ip link set up $name type can bitrate 500000"

并执行

udevad control --reload-rules && udevadm trigger

即可实现CAN接口的自动启动。

4. CAN接口的相关工具

可使用命令行工具cansend和candump进行进行CAN通信的快速测试,其格式为

#接收并输出can0接口的数据
candump can0
#接受并输出任何can接口的数据
candump any
#从can0端口发送标识为123,0x11223344的数据
cansend can0 123#11223344

并且在调试时可以将数据同时发送给自己一份,以便进行调试,可在启动can的命令结尾加上loopback=on

5. 使用Python进行CAN通信

在python中使用CAN通信,可安装python的can扩展,可使用pip或系统软件包管理系统进行安装python-can,其使用的方式如所示

#使用can0接口,波特率500kbps,loopback模式为关闭
can0 = can.interface.Bus(channel = 'can0', \
    interface='socketcan', bitrate=500000, local_loopback=False);
#创建id为0x00000625的CAN帧,数据为data,格式为标准帧
msg = can.Message(arbitration_id=0x00000625, \
        data=bytearray([0x40, 0x05, 0x60, 0x00, 0x00, 0x00, 0x00, 0x00]), is_extended_id=False);
#发送数据
can0.send(msg);
#接收数据,等待时间1秒
ret = can0.recv(1.0);

使用PyTorch实现强化学习

一直以为强化学习在各种神经网络框架中会有单独的模块,然而试了才发现其用的就是一般的神经网络,只是在使用上有一套独特的逻辑——强化学习的核心是通过神经网络对行动价值函数Q(s,a)进行预测,进而选择价值函数最高的行动从而实现行动的最优化。

1. 强化学习的基本概念

强化学习相对一般类型的机器学习,其特点是通过和环境的交互中收集数据作为输入,并据此获得实现奖励最大化。

收益:t时刻后的奖励总和,其中每经理一次行动都会将之前每次的奖励都乘以折现率(0~1之间),避免出现溢出问题,同时也使得近期能够获得的奖励权重更高。

\[G_t = R_t + \gamma R_{t+1} + \gamma^2 R_{t+2} + …\ = R_t + \gamma G_{t+1}\]

状态价值函数V:在某状态S下以策略P行动获得收益的期望值。

行动价值函数Q:在某状态S下采取采取A行动,此后以策略P行动获得收益的期望。

强化学习的过程为通过蒙特卡洛方法对行动价值函数Q进行估计,从而能够在选择行动是选取使Q最大的行动A。

2. 构建强化学习的数据结构

首先需要使用一个队列来对每一次行动进行记录,从而以此来训练对应的神经网络,可以借助Python中的dqueue,对应的功能代码如下

class ReplayMemory():
    def __init__(self, capacity):
        self.memory = collections.deque(maxlen=capacity);

    def add(self, data):
        self.memory.append(data);

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size);

    def __len__(self):
        return len(self.memory);

提供了增加一项数据和从中抽取一定量数据两个方法,同时还实现了计算加入的数据总量的方法。

然后将设计几个进行数据预测的神经网络结构,这里使用三层全连接网络,其结构如下

class QNet(torch.nn.Module):
    def __init__(self, n_observations, n_action):
        super(QNet, self).__init__();
        self.layer1 = torch.nn.Linear(n_observations, 128);
        self.layer2 = torch.nn.Linear(128, 128);
        self.layer3 = torch.nn.Linear(128, n_action);

    def forward(self, x):
        x = torch.nn.functional.relu(self.layer1(x));
        x = torch.nn.functional.relu(self.layer2(x));
        return self.layer3(x);

其输入数据的维度为状态参数的维度,对应相关的状态参数,输出数据维度为行动种类的维度,对应每种行动的行动价值函数值。相当于拟合函数Q(S,A)

一般在进行强化学习时会使用两个神经网络,一个通过数据进行参数的优化,并使用改神经网络决定行动,同时将参数定期将参数同步给另一个(不太明白问什么需要这样操作,感觉可能是为了提高收敛性)。

#当前神经网络
policy_net = QNet(n_observations, n_actions).to(device);
#目标神经网络
target_net = QNet(n_observations, n_actions).to(device);

3. 强化学习的流程

进行episodes轮的行动,并在每轮中以一定的概率根据预测到的Q最大的行动进行行动,一定的概率随机行动,并记录每次行动的状态和对应的奖励为一组数据,并放置到队列中,当队列有足够的长度后,每次行动后抽取一定的数据样本,通过计算和神经网络分别得到行动价值函数,并通过优化神经网络当前神经网络减两者之间的误差。在完成一次当前神经网络的训练后,将当前神经网络和目标神经网络进行加权平均作为新的目标神经网络,如此完成一轮行动。

for i in range(0, n_episodes):
    state, info = env.reset();
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0);
    for t in itertools.count():
        with torch.no_grad():
        #根据当前网络得到行动最大价值的行动,并以一定概率选择该行动,一定概率随机行动
            action = policy_net(state).max(1).indices[0].item() \
                if random.random() > EPS_END else numpy.random.choice(n_actions);
        #获取行动后的需结果
        observation, reward, terminated, truncated, _ = env.step(action);
        next_state = None if terminated \
            else torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0);

        #将结果记录到记忆中,并更新当前状态
        memory.add((state, action, next_state, reward));
        state = next_state;

        #当记忆中的数据大于一次训练所需要的数据,进行一次神经网络的训练
        if len(memory) >= BATCH_SIZE:
            #抽取数据,并进行封装
            batch = memory.sample(BATCH_SIZE);
            state_batch = [];
            action_batch = [];
            reward_batch = [];
            next_state_batch = [];
          
            for item in batch:
                state_batch.append(item[0]);
                next_state_batch.append(item[2]);
                action_batch.append(torch.tensor([[item[1]]], device=device));
                reward_batch.append(torch.tensor([[item[3]]], device=device));
            state_batch = torch.cat(state_batch);
            action_batch = torch.cat(action_batch);
            reward_batch = torch.cat(reward_batch);

            #剔除为终止位置的数据(终止位置行动价值函数为0)
            non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, next_state_batch)), \
                device=device, dtype=torch.bool);
            non_final_next_states = torch.cat([s for s in next_state_batch if s is not None]);

            #使用当前神经网络预测行动价值函数
            state_action_values = policy_net(state_batch).gather(1, action_batch);
            next_state_values = torch.zeros(BATCH_SIZE, device=device);

            with torch.no_grad():
                #使用目标神经网络预测下一状态的价值函数
                next_state_values[non_final_mask] = target_net(non_final_next_states).max(1).values;
            expected_state_action_values = (next_state_values.unsqueeze(1) * GAMMA) + reward_batch;

            #对当前网络的预测进行优化
            loss_fn = torch.nn.SmoothL1Loss();
            loss = loss_fn(state_action_values, expected_state_action_values);
            optim.zero_grad();
            loss.backward();
            torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100);
            optim.step();
      
        #将当前网络同步到目标网络,这里使用了当前网络和目标网络的加权平均进行同步,一定程度提高收敛性 
        target_net_state_dict = target_net.state_dict();
        policy_net_state_dict = policy_net.state_dict();
        for key in policy_net_state_dict:
            target_net_state_dict[key] = policy_net_state_dict[key] * \
                TAU + target_net_state_dict[key] * (1 - TAU);
        target_net.load_state_dict(target_net_state_dict);

        if terminated or truncated == True:
            print(i, t);
            break;

4. 总结

强化学习的过程是通过蒙特卡洛方法对于行动价值函数Q(s,a)进行拟合的过程,因为一旦能够准确预测行动价值函数,就很容易选择行动价值最大化的行为进行行动,同时由于对于行动价值函数的预估可能存在偏差,可始终保留一定概率继续通过蒙特卡洛方法修正行动价值函数,使得得到的模型可以不断的提升性能。