#!/usr/bin/env python import sys, fnmatch import importlib # peripheral types that can be shared, wildcard patterns SHARED_MAP = ["I2C*", "USART*_TX", "UART*_TX", "SPI*", "TIM*_UP", "TIM*_CH*"] ignore_list = [] dma_map = None debug = False def check_possibility(periph, dma_stream, curr_dict, dma_map, check_list, cannot_use_stream, forbidden_map): global ignore_list if debug: print('............ Checking ', periph, dma_stream, 'without', cannot_use_stream) for other_periph in curr_dict: if other_periph != periph: if curr_dict[other_periph] == dma_stream: if other_periph in forbidden_map[periph]: if debug: print('.................... Forbidden', periph, other_periph) return False if debug: print('.................... Collision', other_periph, dma_stream) ignore_list.append(periph) check_str = "%s(%d,%d) %s(%d,%d)" % ( other_periph, curr_dict[other_periph][0], curr_dict[other_periph][1], periph, dma_stream[0], dma_stream[1]) #check if we did this before if check_str in check_list: return False check_list.append(check_str) if debug: print("Trying to Resolve Conflict: ", check_str) #check if we can resolve by swapping with other periphs for streamchan in dma_map[other_periph]: stream = (streamchan[0], streamchan[1]) if stream != curr_dict[other_periph] and \ stream not in cannot_use_stream and \ check_possibility(other_periph, stream, curr_dict, dma_map, check_list, cannot_use_stream+[(dma_stream)], forbidden_map): curr_dict[other_periph] = stream if debug: print ('....................... Resolving', other_periph, stream) return True if debug: print ('....................... UnSolved !!!!!!!!', periph, dma_stream) return False if debug: print ('....................... Solved ..........', periph, dma_stream) return True def can_share(periph, noshare_list): '''check if a peripheral is in the SHARED_MAP list''' for noshare in noshare_list: if fnmatch.fnmatch(periph, noshare): return False for f in SHARED_MAP: if fnmatch.fnmatch(periph, f): return True if debug: print("%s can't share" % periph) return False # list of peripherals on H7 that are on DMAMUX2 and BDMA have_DMAMUX = False DMAMUX2_peripherals = [ 'I2C4', 'SPI6', 'ADC3' ] def dmamux_channel(key): '''return DMAMUX channel for H7''' for p in DMAMUX2_peripherals: if key.find(p) != -1: return 'STM32_DMAMUX2_' + key # default to DMAMUX1 return 'STM32_DMAMUX1_' + key def dma_name(key): '''return 'DMA' or 'BDMA' based on peripheral name''' if not have_DMAMUX: return "DMA" for p in DMAMUX2_peripherals: if key.find(p) != -1: return 'BDMA' return 'DMA' def chibios_dma_define_name(key): '''return define name needed for board.h for ChibiOS''' dma_key = key + '_' + dma_name(key) if key.startswith('ADC'): return 'STM32_ADC_%s_' % dma_key elif key.startswith('SPI'): return 'STM32_SPI_%s_' % dma_key elif key.startswith('I2C'): return 'STM32_I2C_%s_' % dma_key elif key.startswith('USART'): return 'STM32_UART_%s_' % dma_key elif key.startswith('UART'): return 'STM32_UART_%s_' % dma_key elif key.startswith('SDIO') or key.startswith('SDMMC'): return 'STM32_SDC_%s_' % dma_key elif key.startswith('TIM'): return 'STM32_TIM_%s_' % dma_key else: print("Error: Unknown key type %s" % key) sys.exit(1) def get_list_index(peripheral, priority_list): '''return index into priority_list for a peripheral''' for i in range(len(priority_list)): str = priority_list[i] if fnmatch.fnmatch(peripheral, str): return i # default to max priority return len(priority_list) def get_sharing_priority(periph_list, priority_list): '''get priority of a list of peripherals we could share with''' highest = len(priority_list) for p in periph_list: prio = get_list_index(p, priority_list) if prio < highest: highest = prio return highest def generate_DMAMUX_map_mask(peripheral_list, channel_mask, noshare_list, dma_exclude, stream_ofs): ''' generate a dma map suitable for a board with a DMAMUX In principle any peripheral can use any stream, but we need to ensure that a peripheral doesn't try to use the same stream as its partner (eg. a RX/TX pair) ''' dma_map = {} idsets = {} # first unshareable peripherals available = channel_mask for p in peripheral_list: dma_map[p] = [] idsets[p] = set() for p in peripheral_list: if can_share(p, noshare_list) or p in dma_exclude: continue for i in range(16): mask = (1<= len(idsets[other]) and len(idsets[other]) > 0: continue idsets[other].remove(ii) dma_map[other].remove((dma,stream)) found = ii break if found is None: continue base = (found+1) % 16 dma = (found // 8) + 1 stream = found % 8 dma_map[p].append((dma,stream)) idsets[p].add(found) idx = (idx+1) % 16 if stream_ofs != 0: # add in stream_ofs to cope with STM32G4 for p in dma_map.keys(): for (dma,stream) in dma_map[p]: map2 = [] map2.append((dma,stream+stream_ofs)) dma_map[p] = map2 if debug: print('dma_map: ', dma_map) print('idsets: ', idsets) print('available: 0x%04x' % available) return dma_map def generate_DMAMUX_map(peripheral_list, noshare_list, dma_exclude, stream_ofs): ''' generate a dma map suitable for a board with a DMAMUX1 and DMAMUX2 ''' # first split peripheral_list into those for DMAMUX1 and those for DMAMUX2 dmamux1_peripherals = [] dmamux2_peripherals = [] for p in peripheral_list: if dma_name(p) == 'BDMA': dmamux2_peripherals.append(p) else: dmamux1_peripherals.append(p) map1 = generate_DMAMUX_map_mask(dmamux1_peripherals, 0xFFFF, noshare_list, dma_exclude, stream_ofs) # there are 8 BDMA channels, but an issue has been found where if I2C4 and SPI6 # use neighboring channels then we sometimes lose a BDMA completion interrupt. To # avoid this we set the BDMA available mask to 0x33, which forces the channels not to be # adjacent. This issue was found on a CUAV-X7, with H743 RevV. map2 = generate_DMAMUX_map_mask(dmamux2_peripherals, 0x55, noshare_list, dma_exclude, stream_ofs) # translate entries from map2 to "DMA controller 3", which is used for BDMA for p in map2.keys(): streams = [] for (controller,stream) in map2[p]: streams.append((3,stream)) map2[p] = streams both = map1 both.update(map2) if debug: print('dma_map_both: ', both) return both def sharing_allowed(p1, p2): '''return true if sharing is allowed between p1 and p2''' if p1 == p2: return True # don't allow RX and TX of same peripheral to share if p1.endswith('_RX') and p2.endswith('_TX') and p1[:-2] == p2[:-2]: return False # don't allow sharing of two TIMn_UP channels as DShot code can't cope if p1.endswith("_UP") and p2.endswith("_UP") and p1.startswith("TIM") and p2.startswith("TIM"): return False return True def check_sharing(shared): '''check if DMA channel sharing is OK''' for p in shared: # don't share UART RX with anything if (p.startswith("UART") or p.startswith("USART")) and p.endswith("_RX"): print("Illegal sharing of %s" % p) return False # don't share ADC with anything if p.startswith("ADC"): print("Illegal sharing of %s" % p) return False for p2 in shared: if not sharing_allowed(p, p2): print("Illegal sharing of %s and %s" % (p, p2)) return False return True def forbidden_list(p, peripheral_list): '''given a peripheral, form a list of other peripherals we may not share with''' ret = [] for p2 in peripheral_list: if not sharing_allowed(p, p2): ret.append(p2) return ret def write_dma_header(f, peripheral_list, mcu_type, dma_exclude=[], dma_priority='', dma_noshare=[]): '''write out a DMA resolver header file''' global dma_map, have_DMAMUX, has_bdshot timer_ch_periph = [] has_bdshot = False # form a list of DMA priorities priority_list = dma_priority.split() # sort by priority peripheral_list = sorted(peripheral_list, key=lambda x: get_list_index(x, priority_list)) # form a list of peripherals that can't share noshare_list = dma_noshare[:] try: lib = importlib.import_module(mcu_type) if hasattr(lib, "DMA_Map"): dma_map = lib.DMA_Map else: return [], [] except ImportError: print("Unable to find module for MCU %s" % mcu_type) sys.exit(1) if dma_map is None: have_DMAMUX = True # ensure we don't assign dma for TIMx_CH as we share that with TIMx_UP timer_ch_periph = [periph for periph in peripheral_list if "_CH" in periph] dma_exclude += timer_ch_periph if mcu_type.startswith("STM32G4"): stream_ofs = 1 else: stream_ofs = 0 dma_map = generate_DMAMUX_map(peripheral_list, noshare_list, dma_exclude, stream_ofs) print("Writing DMA map") unassigned = [] curr_dict = {} # build a map from peripheral name to a list of peripherals that it cannot share with forbidden_map = {} for p in peripheral_list: forbidden_map[p] = forbidden_list(p, peripheral_list) for periph in peripheral_list: if "_CH" in periph: has_bdshot = True # the list contains a CH port if periph in dma_exclude: continue assigned = False check_list = [] if not periph in dma_map: print("Unknown peripheral function %s in DMA map for %s" % (periph, mcu_type)) sys.exit(1) if debug: print('\n\n.....Starting lookup for', periph) for streamchan in dma_map[periph]: if debug: print('........Possibility for', periph, streamchan) stream = (streamchan[0], streamchan[1]) if check_possibility(periph, stream, curr_dict, dma_map, check_list, [], forbidden_map): curr_dict[periph] = stream if debug: print ('....................... Setting', periph, stream) assigned = True break if assigned == False: unassigned.append(periph) if debug: print('curr_dict: ', curr_dict) print('unassigned: ', unassigned) # now look for shared DMA possibilities stream_assign = {} for k in curr_dict.keys(): p = curr_dict[k] if not p in stream_assign: stream_assign[p] = [k] else: stream_assign[p].append(k) unassigned_new = unassigned[:] for periph in unassigned: share_possibility = [] for streamchan in dma_map[periph]: stream = (streamchan[0], streamchan[1]) share_ok = True for periph2 in stream_assign[stream]: if not can_share(periph, noshare_list) or not can_share(periph2, noshare_list) or periph2 in forbidden_map[periph]: share_ok = False if share_ok: share_possibility.append(stream) if share_possibility: # sort the possible sharings so minimise impact on high priority streams share_possibility = sorted(share_possibility, key=lambda x: get_sharing_priority(stream_assign[x], priority_list)) # and take the one with the least impact (lowest value for highest priority stream share) stream = share_possibility[-1] if debug: print("Sharing %s on %s with %s" % (periph, stream, stream_assign[stream])) curr_dict[periph] = stream stream_assign[stream].append(periph) unassigned_new.remove(periph) unassigned = unassigned_new for key in sorted(curr_dict.keys()): stream = curr_dict[key] if len(stream_assign[stream]) > 1: if not check_sharing(stream_assign[stream]): sys.exit(1) if debug: print(stream_assign) f.write("\n\n// auto-generated DMA mapping from dma_resolver.py\n") if unassigned: f.write( "\n// Note: The following peripherals can't be resolved for DMA: %s\n\n" % unassigned) ordered_up_channels = [] # produce a list of timers ordered by the DMA streamid of the UP channel # this is so that the dshot code can take out the UP DMA locks in the same order as I2C and SPI for key in curr_dict.keys(): if "_UP" in key: ordered_up_channels.append(key) def order_by_streamid(key): stream = curr_dict[key] return (stream[0] * 8 + stream[1]) * 20 + int(key[3:-3]) ordered_up_channels = sorted(ordered_up_channels, key=order_by_streamid) ordered_timers = [] for key in ordered_up_channels: ordered_timers.append(key[0:-3]) shared_set = set() for key in sorted(curr_dict.keys()): stream = curr_dict[key] shared = '' if len(stream_assign[stream]) > 1: shared = ' // shared %s' % ','.join(stream_assign[stream]) if stream[0] in [1,2]: shared_set.add("(1U<