当前位置:   article > 正文

python libusb多线程_一些有助于理解libusb1.0和ctypes的异步USB操作

libusb 多线程

好吧。我想出来了。传输.flags需要一个字节而不是整数。我真傻。现在我从ioctl得到一个错误代码,errno 16,我想这意味着设备正忙。真是个工作狂。我已经在libusb邮件列表上问过了。在

以下是我目前所掌握的情况。这其实没有那么多代码。大部分是libusb的ctypes结构。向下滚动到底部以查看发生错误的实际代码。在from ctypes import *

VENDOR_ID = 0x04d8

PRODUCT_ID = 0xc002

_USBLCD_MAX_DATA_LEN = 24

LIBUSB_ENDPOINT_IN = 0x80

LIBUSB_ENDPOINT_OUT = 0x00

class EnumerationType(type(c_uint)):

def __new__(metacls, name, bases, dict):

if not "_members_" in dict:

_members_ = {}

for key,value in dict.items():

if not key.startswith("_"):

_members_[key] = value

dict["_members_"] = _members_

cls = type(c_uint).__new__(metacls, name, bases, dict)

for key,value in cls._members_.items():

globals()[key] = value

return cls

def __contains__(self, value):

return value in self._members_.values()

def __repr__(self):

return "" % self.__name__

class Enumeration(c_uint):

__metaclass__ = EnumerationType

_members_ = {}

def __init__(self, value):

for k,v in self._members_.items():

if v == value:

self.name = k

break

else:

raise ValueError("No enumeration member with value %r" % value)

c_uint.__init__(self, value)

@classmethod

def from_param(cls, param):

if isinstance(param, Enumeration):

if param.__class__ != cls:

raise ValueError("Cannot mix enumeration members")

else:

return param

else:

return cls(param)

def __repr__(self):

return "" % (self.name, self.value, self.__class__)

class LIBUSB_TRANSFER_STATUS(Enumeration):

_members_ = {'LIBUSB_TRANSFER_COMPLETED':0,

'LIBUSB_TRANSFER_ERROR':1,

'LIBUSB_TRANSFER_TIMED_OUT':2,

'LIBUSB_TRANSFER_CANCELLED':3,

'LIBUSB_TRANSFER_STALL':4,

'LIBUSB_TRANSFER_NO_DEVICE':5,

'LIBUSB_TRANSFER_OVERFLOW':6}

class LIBUSB_TRANSFER_FLAGS(Enumeration):

_members_ = {'LIBUSB_TRANSFER_SHORT_NOT_OK':1<<0,

'LIBUSB_TRANSFER_FREE_BUFFER':1<<1,

'LIBUSB_TRANSFER_FREE_TRANSFER':1<<2}

class LIBUSB_TRANSFER_TYPE(Enumeration):

_members_ = {'LIBUSB_TRANSFER_TYPE_CONTROL':0,

'LIBUSB_TRANSFER_TYPE_ISOCHRONOUS':1,

'LIBUSB_TRANSFER_TYPE_BULK':2,

'LIBUSB_TRANSFER_TYPE_INTERRUPT':3}

class LIBUSB_CONTEXT(Structure):

pass

class LIBUSB_DEVICE(Structure):

pass

class LIBUSB_DEVICE_HANDLE(Structure):

pass

class LIBUSB_CONTROL_SETUP(Structure):

_fields_ = [("bmRequestType", c_int),

("bRequest", c_int),

("wValue", c_int),

("wIndex", c_int),

("wLength", c_int)]

class LIBUSB_ISO_PACKET_DESCRIPTOR(Structure):

_fields_ = [("length", c_int),

("actual_length", c_int),

("status", LIBUSB_TRANSFER_STATUS)]

class LIBUSB_TRANSFER(Structure):

pass

LIBUSB_TRANSFER_CB_FN = CFUNCTYPE(c_void_p, POINTER(LIBUSB_TRANSFER))

LIBUSB_TRANSFER._fields_ = [("dev_handle", POINTER(LIBUSB_DEVICE_HANDLE)),

("flags", c_ubyte),

("endpoint", c_ubyte),

("type", c_ubyte),

("timeout", c_uint),

("status", LIBUSB_TRANSFER_STATUS),

("length", c_int),

("actual_length", c_int),

("callback", LIBUSB_TRANSFER_CB_FN),

("user_data", c_void_p),

("buffer", POINTER(c_ubyte)),

("num_iso_packets", c_int),

("iso_packet_desc", POINTER(LIBUSB_ISO_PACKET_DESCRIPTOR))]

class TIMEVAL(Structure):

_fields_ = [('tv_sec', c_long), ('tv_usec', c_long)]

lib = cdll.LoadLibrary("libusb-1.0.so")

lib.libusb_open_device_with_vid_pid.restype = POINTER(LIBUSB_DEVICE_HANDLE)

lib.libusb_alloc_transfer.restype = POINTER(LIBUSB_TRANSFER)

def libusb_fill_interrupt_transfer(transfer, dev_handle, endpoint, buffer, length, callback, user_data, timeout):

transfer[0].dev_handle = dev_handle

transfer[0].endpoint = chr(endpoint)

transfer[0].type = chr(LIBUSB_TRANSFER_TYPE_INTERRUPT)

transfer[0].timeout = timeout

transfer[0].buffer = buffer

transfer[0].length = length

transfer[0].user_data = user_data

transfer[0].callback = LIBUSB_TRANSFER_CB_FN(callback)

def cb_transfer(transfer):

print "Transfer status %d" % transfer.status

if __name__ == "__main__":

context = POINTER(LIBUSB_CONTEXT)()

lib.libusb_init(None)

transfer = lib.libusb_alloc_transfer(0)

handle = lib.libusb_open_device_with_vid_pid(None, VENDOR_ID, PRODUCT_ID)

size = _USBLCD_MAX_DATA_LEN

buffer = c_char_p(size)

libusb_fill_interrupt_transfer(transfer, handle, LIBUSB_ENDPOINT_IN + 1, buffer, size, cb_transfer, None, 0)

r = lib.libusb_submit_transfer(transfer) # This is returning -2, should be => 0.

if r < 0:

print "libusb_submit_transfer failed", r

while r >= 0:

print "Poll before"

tv = TIMEVAL(1, 0)

r = lib.libusb_handle_events_timeout(None, byref(tv))

print "Poll after", r

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/711506
推荐阅读
相关标签
  

闽ICP备14008679号