сделать пакет поддержкой python 3

исходный код, который поддерживает только python 2, находится здесь

ссылка на thinkgear.py

Я пытаюсь отредактировать его для поддержки python 3. отредактированный код находится здесь:

import sys

import serial

from io import BytesIO

import struct

from collections import namedtuple

import logging
import logging.handlers

import sys
import time

import datetime

global delta 
delta = []

_log = logging.getLogger(__name__)

_bytelog = logging.getLogger(__name__+'.bytes')
_bytelog.propagate = False
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
_log.addHandler(fh)

class ThinkGearProtocol(object):

    def __init__(self, port):

        self.serial = serial.Serial(port, 57600)
        self.preread = BytesIO()
        self.io = self.serial

    @staticmethod
    def _chksum(packet):
        return ~sum(c for c in packet ) & 0xff

    def _read(self, n):
        buf = self.io.read(n)
        if len(buf) < n:
            _log.debug('incomplete read, short %s bytes', n - len(buf))
            if self.io == self.preread:
                _log.debug('end of preread buffer')
                # self.preread.reset()
                # self.preread.truncate()
                # two line comment out
                self.io = self.serial
                buf += self.io.read(n-len(buf))
                if len(buf) < n:
                    _log.debug('incomplete read, short %s bytes', n - len(buf))

        for o in range(0, len(buf), 16):
            _bytelog.debug('%04X  '+' '.join(('%02X',)*len(buf[o:o+16])), o, *(c for c in buf[o:o+16]))

        return buf

    def _deread(self, buf):
        _log.debug('putting back %s bytes', len(buf))
        pos = self.preread.tell()
        self.preread.seek(0, 2)
        self.preread.write(buf)
        self.preread.seek(pos)
        self.io = self.preread

    def get_packets(self):
        last_two = ()
        while True:
            last_two = last_two[-1:]+(self._read(1),)
            # _log.debug('last_two: %r', last_two)
            if last_two == (b'\xAA',b'\xAA'):
                plen = self._read(1)
                if plen >= b'\xAA':
                    # Bogosity
                    _log.debug('discarding %r while syncing', last_two[0])
                    last_two = last_two[-1:]+(plen,)

                else:
                    last_two = ()
                    packet = self._read(int.from_bytes((plen), byteorder='big'))
                    # _log.debug(plen)
                    checksum = self._read(1)

                    if ord(checksum) == self._chksum(packet):
                        yield self._decode(packet)

                    else:
                        _log.debug('bad checksum')
                        self._deread(packet+checksum)

            elif len(last_two) == 2:
                _log.debug('discarding %r while syncing', last_two[0])


    def _decode(self, packet):
        decoded = []

        while packet:
            extended_code_level = 0
            while len(packet) and packet[0] == '\x55':
                extended_code_level += 1
                packet = packet[1:]
            if len(packet) < 2:
                _log.debug('ran out of packet: %r', '\x55'*extended_code_level+packet)
                break
            code = packet[0]
            if code < 0x80:
                value = packet[1]
                packet = packet[2:]
            else:
                vlen = packet[1]
                if len(packet) < 2+vlen:
                    _log.debug('ran out of packet: %r', '\x55'*extended_code_level+chr(code)+chr(vlen)+packet)
                    break
                value = packet[2:2+vlen]
                packet = packet[2+vlen:]

            # _log.debug('extended_code_level is '+str(extended_code_level))
            # _log.debug('code is '+str(code))  
            # _log.debug('data_types is '+str(data_types))
            # _log.debug(not extended_code_level and code in data_types)
            # _log.debug(not bool(extended_code_level and code in data_types))
            # _log.debug((extended_code_level,code) in data_types)
            if not bool(extended_code_level and code in data_types):
                data = data_types[code](extended_code_level, code, value)
                # _log.debug('extended_code_level is '+str(extended_code_level))
                # _log.debug('code is '+str(code))
                # _log.debug('value is '+str(value))
                # _log.debug('data_types is '+str(data_types))
            elif (extended_code_level,code) in data_types:
                data = data_types[(extended_code_level,code)](extended_code_level, code, value)

            else:
                data = ThinkGearUnknownData(extended_code_level, code, value)

            decoded.append(data)

        return decoded


data_types = {}

class ThinkGearMetaClass(type):
    def __new__(mcls, name, bases, data):
        cls = super(ThinkGearMetaClass, mcls).__new__(mcls, name, bases, data)
        code = getattr(cls, 'code', None)
        if code:
            data_types[code] = cls
            extended_code_level = getattr(cls, 'extended_code_level', None)
            if extended_code_level:
                data_types[(extended_code_level,code)] = cls
        return cls


class ThinkGearData(object, metaclass=ThinkGearMetaClass):
    def __init__(self, extended_code_level, code, value):
        self.extended_code_level = extended_code_level
        self.code = code
        # self.value = self._decode(value)
        self.value = value
        # _log.debug('123')
        if self._log:
            _log.log(self._log, '%s', self)

    @staticmethod

    def _decode(v):
        return v

    def __str__(self):
        return self._strfmt % vars(self)

    # __metaclass__ = ThinkGearMetaClass

    _log = logging.DEBUG


class ThinkGearUnknownData(ThinkGearData):
    '''???'''
    _strfmt = 'Unknown: code=%(code)02X extended_code_level=%(extended_code_level)s %(value)r'


class ThinkGearPoorSignalData(ThinkGearData):
    '''POOR_SIGNAL Quality (0-255)'''
    code = 0x02
    _strfmt = 'POOR SIGNAL: %(value)s'
    _decode = staticmethod(ord)


class ThinkGearAttentionData(ThinkGearData):
    '''ATTENTION eSense (0 to 100)'''
    code = 0x04
    _strfmt = 'ATTENTION eSense: %(value)s'
    _decode = staticmethod(ord)


class ThinkGearMeditationData(ThinkGearData):
    '''MEDITATION eSense (0 to 100)'''
    code = 0x05
    _strfmt = 'MEDITATION eSense: %(value)s'
    _decode = staticmethod(ord)


class ThinkGearRawWaveData(ThinkGearData):
    '''RAW Wave Value (-32768 to 32767)'''
    code = 0x80
    _strfmt = 'Raw Wave: %(value)s'
    _decode = staticmethod(lambda v: struct.unpack('>h', v)[0])
    # There are lots of these, don't log them by default
    _log = False


EEGPowerData = namedtuple('EEGPowerData', 'delta theta lowalpha highalpha lowbeta highbeta lowgamma midgamma')
delta_value = namedtuple('EEGPowerData', 'delta')
class ThinkGearEEGPowerData(ThinkGearData):
    '''Eight EEG band power values (0 to 16777215).
    
    delta, theta, low-alpha high-alpha, low-beta, high-beta, low-gamma, and
    mid-gamma EEG band power values.
    '''

    code = 0x83
    _strfmt = 'ASIC EEG Power: %(value)r'
    _decode = staticmethod(lambda v: EEGPowerData(*struct.unpack('>8L', ''.join( '\x00'+v[o:o+3] for o in range(0, 24, 3)))))
    #print(EEGPowerData.delta)


def main():
    global packet_log
    packet_log = []
    logging.basicConfig(level=logging.DEBUG)

    for pkt in ThinkGearProtocol('COM3').get_packets():
        packet_log.append(pkt)
if __name__ == '__main__':
    main()

при запуске в python2 я получаю такой результат:

DEBUG:__main__:ASIC EEG Power: EEGPowerData(delta=7784, theta=7734, lowalpha=2035, highalpha=1979, lowbeta=2914, highbeta=3996, lowgamma=1944, midgamma=1847

при запуске в python3 результат такой:

DEBUG:__main__:ASIC EEG Power: b'\x00\xa9\xf1\x00t%\x00\rK\x00\x18"\x00\x16%\x00\x1d6\x00OT\x00\x17\x84'

Кто-нибудь знает, как мне отредактировать эту строку кода, чтобы она работала в python 3? Спасибо

_decode = staticmethod(lambda v: EEGPowerData(*struct.unpack('>8L', ''.join( '\x00'+v[o:o+3] for o in range(0, 24, 3)))))

person zzz    schedule 01.02.2021    source источник
comment
Хорошо, теперь он выглядит завершенным, но также важно, чтобы вы сделали его минимальным. (прочитайте минимальный воспроизводимый пример) - ошибка только в построении data_types dict. ---- в любом случае, вопрос, вероятно, является дубликатом metaclass - __metaclass__ в Python 3 - Stack Overflow , если нет других ошибок..   -  person user202729    schedule 01.02.2021
comment
очень ценю вашу помощь! это сработало. а теперь другие ошибки...   -  person zzz    schedule 01.02.2021
comment
DEBUG:__main__:ASIC Мощность ЭЭГ: b'\x00\r|\x00L\xa3\x00F,\x00\x0b\xca\x00\x1a\x80\x00)#\x006\xec\x00\x0cN'   -  person zzz    schedule 01.02.2021
comment
Каково фактическое и ожидаемое поведение? (отредактируйте вопрос, включая фиксированный код.)   -  person user202729    schedule 01.02.2021
comment
Вы знаете, почему мощность ЭЭГ не расшифровывается в 8 цифр?   -  person zzz    schedule 01.02.2021
comment
Чтобы перейти с Python 2 на 3, вы можете прочитать о python - В чем разница между строкой и строкой байтов? - Переполнение стека тоже.   -  person user202729    schedule 01.02.2021
comment
Я отредактировал фиксированный код.   -  person zzz    schedule 01.02.2021
comment
Я должен получить :DEBUG:__main__:ASIC EEG Power: EEGPowerData(delta=7784, theta=7734, lowalpha=2035, highalpha=1979, lowbeta=2914, highbeta=3996, lowgamma=1944, midgamma=1847), но на самом деле я получаю : DEBUG:__main__:ASIC Мощность ЭЭГ: b'\x00\r|\x00L\xa3\x00F,\x00\x0b\xca\x00\x1a\x80\x00)#\x006\xec\x00\x0cN'   -  person zzz    schedule 01.02.2021
comment
Я попробовал int.from_bytes(), но он не работает в номере 8, не могли бы вы рассказать мне, как это решить?   -  person zzz    schedule 01.02.2021