Compare commits

..

3 Commits

Author SHA1 Message Date
Mr_Goldberg acebfc4e4d
Fix controller config generator. 2021-09-15 02:38:54 -04:00
Mr_Goldberg a24a9c266f
Make the TriggerVibrationvibration behave closer to real steam. 2021-09-14 16:11:17 -04:00
Mr_Goldberg b3a5102a3c
Add script to generate controller configs from vdf files. 2021-09-14 16:08:41 -04:00
7 changed files with 1028 additions and 4 deletions

View File

@ -133,6 +133,7 @@ For analog actions (joysticks, triggers): ACTION_NAME=ANALOG_NAME=input source m
Actions can be bound to more than one button by separating the buttons with , like this: ACTION_NAME=A,B
If you want to configure a game yourself, find the xbox360 or xbox one vdf file for the game and you should be able to figure things out.
You can also use the scripts\controller_config_generator\parse_controller_vdf.py script in the emu source code repo.
For example to get the vdf file for the game Crystar: https://steamdb.info/app/981750/config/
If you look at: steamcontrollerconfigdetails, you will see something like: 1779660455/controller_type: controller_xbox360

View File

@ -262,6 +262,7 @@ STEAMAPI_API void S_CALLTYPE SteamAPI_Shutdown()
PRINT_DEBUG("SteamAPI_Shutdown\n");
get_steam_client()->clientShutdown();
get_steam_client()->BReleaseSteamPipe(user_steam_pipe);
get_steam_client()->BShutdownIfAllPipesClosed();
user_steam_pipe = 0;
--global_counter;
old_user_instance = NULL;
@ -652,6 +653,7 @@ STEAMAPI_API void SteamGameServer_Shutdown()
PRINT_DEBUG("SteamGameServer_Shutdown\n");
get_steam_client()->serverShutdown();
get_steam_client()->BReleaseSteamPipe(server_steam_pipe);
get_steam_client()->BShutdownIfAllPipesClosed();
server_steam_pipe = 0;
--global_counter;
g_pSteamClientGameServer = NULL; //TODO: check if this actually gets nulled when SteamGameServer_Shutdown is called

View File

@ -794,9 +794,14 @@ bool Steam_Client::BShutdownIfAllPipesClosed()
{
PRINT_DEBUG("BShutdownIfAllPipesClosed\n");
if (!steam_pipes.size()) {
if (background_keepalive.joinable()) {
bool joinable = background_keepalive.joinable();
if (joinable) {
kill_background_thread = true;
kill_background_thread_cv.notify_one();
}
steam_controller->Shutdown();
if (joinable) {
background_keepalive.join();
}

View File

@ -63,6 +63,19 @@ struct Controller_Action {
}
};
struct Rumble_Thread_Data {
std::condition_variable rumble_thread_cv;
std::atomic_bool kill_rumble_thread;
std::mutex rumble_mutex;
struct Rumble_Data {
unsigned short left, right, last_left, last_right;
unsigned int rumble_length_ms;
bool new_data;
} data[GAMEPAD_COUNT];
};
enum EXTRA_GAMEPAD_BUTTONS {
BUTTON_LTRIGGER = BUTTON_COUNT + 1,
BUTTON_RTRIGGER = BUTTON_COUNT + 2,
@ -146,6 +159,9 @@ public ISteamInput
std::map<EInputActionOrigin, std::string> steaminput_glyphs;
std::map<EControllerActionOrigin, std::string> steamcontroller_glyphs;
std::thread background_rumble_thread;
Rumble_Thread_Data *rumble_thread_data;
bool disabled;
bool initialized;
@ -210,6 +226,66 @@ public ISteamInput
public:
static void background_rumble(Rumble_Thread_Data *data)
{
std::mutex mtx;
std::unique_lock<std::mutex> lck(mtx);
bool rumbled = false;
while (true) {
bool new_data = false;
if (rumbled) {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
data->rumble_mutex.lock();
for (int i = 0; i < GAMEPAD_COUNT; ++i) {
if (data->data[i].new_data) {
new_data = true;
break;
}
}
data->rumble_mutex.unlock();
if (data->kill_rumble_thread) {
return;
}
}
bool x = new_data || data->rumble_thread_cv.wait_for(lck, std::chrono::milliseconds(100)) != std::cv_status::timeout;
if (data->kill_rumble_thread) {
return;
}
rumbled = false;
while (true) {
unsigned short left, right;
unsigned int rumble_length_ms;
int gamepad = -1;
data->rumble_mutex.lock();
for (int i = 0; i < GAMEPAD_COUNT; ++i) {
if (data->data[i].new_data) {
left = data->data[i].left;
right = data->data[i].right;
rumble_length_ms = data->data[i].rumble_length_ms;
data->data[i].new_data = false;
if (data->data[i].last_left != left || data->data[i].last_right != right) {
gamepad = i;
data->data[i].last_left = left;
data->data[i].last_right = right;
break;
}
}
}
data->rumble_mutex.unlock();
if (gamepad == -1) {
break;
}
GamepadSetRumble((GAMEPAD_DEVICE)gamepad, ((double)left) / 65535.0, ((double)right) / 65535.0, rumble_length_ms);
rumbled = true;
}
}
}
static void steam_run_every_runcb(void *object)
{
PRINT_DEBUG("steam_controller_run_every_runcb\n");
@ -235,6 +311,7 @@ Steam_Controller(class Settings *settings, class SteamCallResults *callback_resu
~Steam_Controller()
{
//TODO rm network callbacks
//TODO rumble thread
this->run_every_runcb->remove(&Steam_Controller::steam_run_every_runcb, this);
}
@ -243,7 +320,7 @@ bool Init()
{
PRINT_DEBUG("Steam_Controller::Init()\n");
std::lock_guard<std::recursive_mutex> lock(global_mutex);
if (disabled) {
if (disabled || initialized) {
return true;
}
@ -261,6 +338,9 @@ bool Init()
controllers.insert(std::pair<ControllerHandle_t, struct Controller_Action>(i, cont_action));
}
rumble_thread_data = new Rumble_Thread_Data();
background_rumble_thread = std::thread(background_rumble, rumble_thread_data);
initialized = true;
return true;
}
@ -275,11 +355,17 @@ bool Shutdown()
{
PRINT_DEBUG("Steam_Controller::Shutdown()\n");
std::lock_guard<std::recursive_mutex> lock(global_mutex);
if (disabled) {
if (disabled || !initialized) {
return true;
}
controllers = std::map<ControllerHandle_t, struct Controller_Action>();
rumble_thread_data->kill_rumble_thread = true;
rumble_thread_data->rumble_thread_cv.notify_one();
background_rumble_thread.join();
delete rumble_thread_data;
GamepadShutdown();
initialized = false;
return true;
}
@ -350,6 +436,7 @@ ControllerActionSetHandle_t GetActionSetHandle( const char *pszActionSetName )
auto set_handle = action_handles.find(upper_action_name);
if (set_handle == action_handles.end()) return 0;
PRINT_DEBUG("Steam_Controller::GetActionSetHandle %s ret %llu\n", pszActionSetName, set_handle->second);
return set_handle->second;
}
@ -417,6 +504,7 @@ ControllerDigitalActionHandle_t GetDigitalActionHandle( const char *pszActionNam
auto handle = digital_action_handles.find(upper_action_name);
if (handle == digital_action_handles.end()) return 0;
PRINT_DEBUG("Steam_Controller::GetDigitalActionHandle %s ret %llu\n", pszActionName, handle->second);
return handle->second;
}
@ -762,7 +850,16 @@ void TriggerVibration( ControllerHandle_t controllerHandle, unsigned short usLef
//FIXME: shadow of the tomb raider on linux doesn't seem to turn off the rumble so I made it expire after 100ms. Need to check if this is how linux steam actually behaves.
rumble_length_ms = 100;
#endif
GamepadSetRumble((GAMEPAD_DEVICE)(controllerHandle - 1), ((double)usLeftSpeed) / 65535.0, ((double)usRightSpeed) / 65535.0, rumble_length_ms);
unsigned gamepad_device = (controllerHandle - 1);
if (gamepad_device > GAMEPAD_COUNT) return;
rumble_thread_data->rumble_mutex.lock();
rumble_thread_data->data[gamepad_device].new_data = true;
rumble_thread_data->data[gamepad_device].left = usLeftSpeed;
rumble_thread_data->data[gamepad_device].right = usRightSpeed;
rumble_thread_data->data[gamepad_device].rumble_length_ms = rumble_length_ms;
rumble_thread_data->rumble_mutex.unlock();
rumble_thread_data->rumble_thread_cv.notify_one();
}

View File

@ -0,0 +1,177 @@
#controller vdf script by mr_goldberg
#generates controller config from a vdf
import vdf
import sys
import os
keymap_digital = {
"button_a": "A",
"button_b": "B",
"button_x": "X",
"button_y": "Y",
"dpad_north": "DUP",
"dpad_south": "DDOWN",
"dpad_east": "DRIGHT",
"dpad_west": "DLEFT",
"button_escape": "START",
"button_menu": "BACK",
"left_bumper": "LBUMPER",
"right_bumper": "RBUMPER",
"button_back_left": "A",
"button_back_right": "X",
"": "",
"": "",
"": "",
"": "",
"": "",
"": "",
}
def add_input_bindings(group, bindings, force_binding=None, keymap=keymap_digital):
for i in group["inputs"]:
for act in group["inputs"][i]:
for fp in group["inputs"][i][act]:
for bd in group["inputs"][i][act][fp]:
for bbd in group["inputs"][i][act][fp][bd]:
if bbd == 'binding':
x = group["inputs"][i][act][fp][bd].get_all_for(bbd)
for ss in x:
st = ss.split()
if st[0] == 'game_action':
if st[2][-1] == ",":
action_name = st[2][:-1]
else:
action_name = st[2][:]
if force_binding is None:
binding = keymap[i.lower()]
else:
binding = force_binding
if action_name in bindings:
if binding not in bindings[action_name]:
bindings[action_name].append(binding)
else:
bindings[action_name] = [binding]
return bindings
if len(sys.argv) < 2:
print("format: {} xbox_controller_config.vdf".format(sys.argv[0]))
exit(0)
with open(sys.argv[1], 'rb') as f:
t = f.read().decode('utf-8')
d = vdf.loads(t, mapper=vdf.VDFDict, merge_duplicate_keys=False)
controller_mappings = d["controller_mappings"]
groups = controller_mappings.get_all_for("group")
groups_byid = {}
for g in groups:
groups_byid[g["id"]] = g
actions = controller_mappings.get_all_for("actions")
action_list = []
for a in actions:
for k in a:
action_list.append(k)
presets = controller_mappings.get_all_for("preset")
all_bindings = {}
for p in presets:
name = p["name"]
if name not in action_list:
continue
group_bindings = p["group_source_bindings"]
bindings = {}
for number in group_bindings:
s = group_bindings[number].split()
if s[1] != "active":
continue
#print(s)
if s[0] in ["switch", "button_diamond", "dpad"]:
group = groups_byid[number]
#print(group)
bindings = add_input_bindings(group, bindings)
if s[0] in ["left_trigger", "right_trigger"]:
group = groups_byid[number]
if group["mode"] == "trigger":
for g in group:
if g == "gameactions":
#print(group)
action_name = group["gameactions"][name]
if s[0] == "left_trigger":
binding = "LTRIGGER"
else:
binding = "RTRIGGER"
if action_name in bindings:
if binding not in bindings[action_name] and (binding + "=trigger") not in bindings[action_name]:
bindings[action_name].insert(0, binding)
else:
bindings[action_name] = [binding + "=trigger"]
if g == "inputs":
if s[0] == "left_trigger":
binding = "DLTRIGGER"
else:
binding = "DRTRIGGER"
bindings = add_input_bindings(group, bindings, binding)
else:
print("unhandled trigger mode", group["mode"])
if s[0] in ["joystick", "right_joystick", "dpad"]:
group = groups_byid[number]
if group["mode"] == "joystick_move":
for g in group:
if g == "gameactions":
#print(group)
action_name = group["gameactions"][name]
if s[0] == "joystick":
binding = "LJOY"
elif s[0] == "right_joystick":
binding = "RJOY"
elif s[0] == "dpad":
binding = "DPAD"
else:
print("could not handle", s[0])
if action_name in bindings:
if binding not in bindings[action_name] and (binding + "=joystick_move") not in bindings[action_name]:
bindings[action_name].insert(0, binding)
else:
bindings[action_name] = [binding + "=joystick_move"]
if g == "inputs":
if s[0] == "joystick":
binding = "LSTICK"
else:
binding = "RSTICK"
bindings = add_input_bindings(group, bindings, binding)
elif group["mode"] == "dpad":
if s[0] == "joystick":
binding_map = {"dpad_north":"DLJOYUP", "dpad_south": "DLJOYDOWN", "dpad_west": "DLJOYLEFT", "dpad_east": "DLJOYRIGHT"}
bindings = add_input_bindings(group, bindings, keymap=binding_map)
elif s[0] == "right_joystick":
binding_map = {"dpad_north":"DRJOYUP", "dpad_south": "DRJOYDOWN", "dpad_west": "DRJOYLEFT", "dpad_east": "DRJOYRIGHT"}
bindings = add_input_bindings(group, bindings, keymap=binding_map)
else:
if s[0] != "dpad":
print("no pad", s[0])
else:
print("unhandled joy mode", group["mode"])
all_bindings[name] = bindings
#print(controller_mappings["preset"][(0, "group_source_bindings")])
#print(all_bindings)
config_directory = sys.argv[1] + "_config/steam_settings" + "/controller/"
if not os.path.exists(config_directory):
os.makedirs(config_directory)
for k in all_bindings:
with open(config_directory + k + '.txt', 'w') as f:
for b in all_bindings[k]:
f.write(b + "=" + ','.join(all_bindings[k][b]) + "\n")

View File

@ -0,0 +1,521 @@
"""
Module for deserializing/serializing to and from VDF
"""
__version__ = "3.4"
__author__ = "Rossen Georgiev"
import re
import sys
import struct
from binascii import crc32
from io import BytesIO
from io import StringIO as unicodeIO
try:
from collections.abc import Mapping
except:
from collections import Mapping
from vdf.vdict import VDFDict
# Py2 & Py3 compatibility
if sys.version_info[0] >= 3:
string_type = str
int_type = int
BOMS = '\ufffe\ufeff'
def strip_bom(line):
return line.lstrip(BOMS)
else:
from StringIO import StringIO as strIO
string_type = basestring
int_type = long
BOMS = '\xef\xbb\xbf\xff\xfe\xfe\xff'
BOMS_UNICODE = '\\ufffe\\ufeff'.decode('unicode-escape')
def strip_bom(line):
return line.lstrip(BOMS if isinstance(line, str) else BOMS_UNICODE)
# string escaping
_unescape_char_map = {
r"\n": "\n",
r"\t": "\t",
r"\v": "\v",
r"\b": "\b",
r"\r": "\r",
r"\f": "\f",
r"\a": "\a",
r"\\": "\\",
r"\?": "?",
r"\"": "\"",
r"\'": "\'",
}
_escape_char_map = {v: k for k, v in _unescape_char_map.items()}
def _re_escape_match(m):
return _escape_char_map[m.group()]
def _re_unescape_match(m):
return _unescape_char_map[m.group()]
def _escape(text):
return re.sub(r"[\n\t\v\b\r\f\a\\\?\"']", _re_escape_match, text)
def _unescape(text):
return re.sub(r"(\\n|\\t|\\v|\\b|\\r|\\f|\\a|\\\\|\\\?|\\\"|\\')", _re_unescape_match, text)
# parsing and dumping for KV1
def parse(fp, mapper=dict, merge_duplicate_keys=True, escaped=True):
"""
Deserialize ``s`` (a ``str`` or ``unicode`` instance containing a VDF)
to a Python object.
``mapper`` specifies the Python object used after deserializetion. ``dict` is
used by default. Alternatively, ``collections.OrderedDict`` can be used if you
wish to preserve key order. Or any object that acts like a ``dict``.
``merge_duplicate_keys`` when ``True`` will merge multiple KeyValue lists with the
same key into one instead of overwriting. You can se this to ``False`` if you are
using ``VDFDict`` and need to preserve the duplicates.
"""
if not issubclass(mapper, Mapping):
raise TypeError("Expected mapper to be subclass of dict, got %s" % type(mapper))
if not hasattr(fp, 'readline'):
raise TypeError("Expected fp to be a file-like object supporting line iteration")
stack = [mapper()]
expect_bracket = False
re_keyvalue = re.compile(r'^("(?P<qkey>(?:\\.|[^\\"])*)"|(?P<key>#?[a-z0-9\-\_\\\?$%<>]+))'
r'([ \t]*('
r'"(?P<qval>(?:\\.|[^\\"])*)(?P<vq_end>")?'
r'|(?P<val>(?:(?<!/)/(?!/)|[a-z0-9\-\_\\\?\*\.$<> ])+)'
r'|(?P<sblock>{[ \t]*)(?P<eblock>})?'
r'))?',
flags=re.I)
for lineno, line in enumerate(fp, 1):
if lineno == 1:
line = strip_bom(line)
line = line.lstrip()
# skip empty and comment lines
if line == "" or line[0] == '/':
continue
# one level deeper
if line[0] == "{":
expect_bracket = False
continue
if expect_bracket:
raise SyntaxError("vdf.parse: expected openning bracket",
(getattr(fp, 'name', '<%s>' % fp.__class__.__name__), lineno, 1, line))
# one level back
if line[0] == "}":
if len(stack) > 1:
stack.pop()
continue
raise SyntaxError("vdf.parse: one too many closing parenthasis",
(getattr(fp, 'name', '<%s>' % fp.__class__.__name__), lineno, 0, line))
# parse keyvalue pairs
while True:
match = re_keyvalue.match(line)
if not match:
try:
line += next(fp)
continue
except StopIteration:
raise SyntaxError("vdf.parse: unexpected EOF (open key quote?)",
(getattr(fp, 'name', '<%s>' % fp.__class__.__name__), lineno, 0, line))
key = match.group('key') if match.group('qkey') is None else match.group('qkey')
val = match.group('qval')
if val is None:
val = match.group('val')
if val is not None:
val = val.rstrip()
if val == "":
val = None
if escaped:
key = _unescape(key)
# we have a key with value in parenthesis, so we make a new dict obj (level deeper)
if val is None:
if merge_duplicate_keys and key in stack[-1]:
_m = stack[-1][key]
# we've descended a level deeper, if value is str, we have to overwrite it to mapper
if not isinstance(_m, mapper):
_m = stack[-1][key] = mapper()
else:
_m = mapper()
stack[-1][key] = _m
if match.group('eblock') is None:
# only expect a bracket if it's not already closed or on the same line
stack.append(_m)
if match.group('sblock') is None:
expect_bracket = True
# we've matched a simple keyvalue pair, map it to the last dict obj in the stack
else:
# if the value is line consume one more line and try to match again,
# until we get the KeyValue pair
if match.group('vq_end') is None and match.group('qval') is not None:
try:
line += next(fp)
continue
except StopIteration:
raise SyntaxError("vdf.parse: unexpected EOF (open quote for value?)",
(getattr(fp, 'name', '<%s>' % fp.__class__.__name__), lineno, 0, line))
stack[-1][key] = _unescape(val) if escaped else val
# exit the loop
break
if len(stack) != 1:
raise SyntaxError("vdf.parse: unclosed parenthasis or quotes (EOF)",
(getattr(fp, 'name', '<%s>' % fp.__class__.__name__), lineno, 0, line))
return stack.pop()
def loads(s, **kwargs):
"""
Deserialize ``s`` (a ``str`` or ``unicode`` instance containing a JSON
document) to a Python object.
"""
if not isinstance(s, string_type):
raise TypeError("Expected s to be a str, got %s" % type(s))
try:
fp = unicodeIO(s)
except TypeError:
fp = strIO(s)
return parse(fp, **kwargs)
def load(fp, **kwargs):
"""
Deserialize ``fp`` (a ``.readline()``-supporting file-like object containing
a JSON document) to a Python object.
"""
return parse(fp, **kwargs)
def dumps(obj, pretty=False, escaped=True):
"""
Serialize ``obj`` to a VDF formatted ``str``.
"""
if not isinstance(obj, Mapping):
raise TypeError("Expected data to be an instance of``dict``")
if not isinstance(pretty, bool):
raise TypeError("Expected pretty to be of type bool")
if not isinstance(escaped, bool):
raise TypeError("Expected escaped to be of type bool")
return ''.join(_dump_gen(obj, pretty, escaped))
def dump(obj, fp, pretty=False, escaped=True):
"""
Serialize ``obj`` as a VDF formatted stream to ``fp`` (a
``.write()``-supporting file-like object).
"""
if not isinstance(obj, Mapping):
raise TypeError("Expected data to be an instance of``dict``")
if not hasattr(fp, 'write'):
raise TypeError("Expected fp to have write() method")
if not isinstance(pretty, bool):
raise TypeError("Expected pretty to be of type bool")
if not isinstance(escaped, bool):
raise TypeError("Expected escaped to be of type bool")
for chunk in _dump_gen(obj, pretty, escaped):
fp.write(chunk)
def _dump_gen(data, pretty=False, escaped=True, level=0):
indent = "\t"
line_indent = ""
if pretty:
line_indent = indent * level
for key, value in data.items():
if escaped and isinstance(key, string_type):
key = _escape(key)
if isinstance(value, Mapping):
yield '%s"%s"\n%s{\n' % (line_indent, key, line_indent)
for chunk in _dump_gen(value, pretty, escaped, level+1):
yield chunk
yield "%s}\n" % line_indent
else:
if escaped and isinstance(value, string_type):
value = _escape(value)
yield '%s"%s" "%s"\n' % (line_indent, key, value)
# binary VDF
class BASE_INT(int_type):
def __repr__(self):
return "%s(%d)" % (self.__class__.__name__, self)
class UINT_64(BASE_INT):
pass
class INT_64(BASE_INT):
pass
class POINTER(BASE_INT):
pass
class COLOR(BASE_INT):
pass
BIN_NONE = b'\x00'
BIN_STRING = b'\x01'
BIN_INT32 = b'\x02'
BIN_FLOAT32 = b'\x03'
BIN_POINTER = b'\x04'
BIN_WIDESTRING = b'\x05'
BIN_COLOR = b'\x06'
BIN_UINT64 = b'\x07'
BIN_END = b'\x08'
BIN_INT64 = b'\x0A'
BIN_END_ALT = b'\x0B'
def binary_loads(b, mapper=dict, merge_duplicate_keys=True, alt_format=False, raise_on_remaining=True):
"""
Deserialize ``b`` (``bytes`` containing a VDF in "binary form")
to a Python object.
``mapper`` specifies the Python object used after deserializetion. ``dict` is
used by default. Alternatively, ``collections.OrderedDict`` can be used if you
wish to preserve key order. Or any object that acts like a ``dict``.
``merge_duplicate_keys`` when ``True`` will merge multiple KeyValue lists with the
same key into one instead of overwriting. You can se this to ``False`` if you are
using ``VDFDict`` and need to preserve the duplicates.
"""
if not isinstance(b, bytes):
raise TypeError("Expected s to be bytes, got %s" % type(b))
return binary_load(BytesIO(b), mapper, merge_duplicate_keys, alt_format, raise_on_remaining)
def binary_load(fp, mapper=dict, merge_duplicate_keys=True, alt_format=False, raise_on_remaining=False):
"""
Deserialize ``fp`` (a ``.read()``-supporting file-like object containing
binary VDF) to a Python object.
``mapper`` specifies the Python object used after deserializetion. ``dict` is
used by default. Alternatively, ``collections.OrderedDict`` can be used if you
wish to preserve key order. Or any object that acts like a ``dict``.
``merge_duplicate_keys`` when ``True`` will merge multiple KeyValue lists with the
same key into one instead of overwriting. You can se this to ``False`` if you are
using ``VDFDict`` and need to preserve the duplicates.
"""
if not hasattr(fp, 'read') or not hasattr(fp, 'tell') or not hasattr(fp, 'seek'):
raise TypeError("Expected fp to be a file-like object with tell()/seek() and read() returning bytes")
if not issubclass(mapper, Mapping):
raise TypeError("Expected mapper to be subclass of dict, got %s" % type(mapper))
# helpers
int32 = struct.Struct('<i')
uint64 = struct.Struct('<Q')
int64 = struct.Struct('<q')
float32 = struct.Struct('<f')
def read_string(fp, wide=False):
buf, end = b'', -1
offset = fp.tell()
# locate string end
while end == -1:
chunk = fp.read(64)
if chunk == b'':
raise SyntaxError("Unterminated cstring (offset: %d)" % offset)
buf += chunk
end = buf.find(b'\x00\x00' if wide else b'\x00')
if wide:
end += end % 2
# rewind fp
fp.seek(end - len(buf) + (2 if wide else 1), 1)
# decode string
result = buf[:end]
if wide:
result = result.decode('utf-16')
elif bytes is not str:
result = result.decode('utf-8', 'replace')
else:
try:
result.decode('ascii')
except:
result = result.decode('utf-8', 'replace')
return result
stack = [mapper()]
CURRENT_BIN_END = BIN_END if not alt_format else BIN_END_ALT
for t in iter(lambda: fp.read(1), b''):
if t == CURRENT_BIN_END:
if len(stack) > 1:
stack.pop()
continue
break
key = read_string(fp)
if t == BIN_NONE:
if merge_duplicate_keys and key in stack[-1]:
_m = stack[-1][key]
else:
_m = mapper()
stack[-1][key] = _m
stack.append(_m)
elif t == BIN_STRING:
stack[-1][key] = read_string(fp)
elif t == BIN_WIDESTRING:
stack[-1][key] = read_string(fp, wide=True)
elif t in (BIN_INT32, BIN_POINTER, BIN_COLOR):
val = int32.unpack(fp.read(int32.size))[0]
if t == BIN_POINTER:
val = POINTER(val)
elif t == BIN_COLOR:
val = COLOR(val)
stack[-1][key] = val
elif t == BIN_UINT64:
stack[-1][key] = UINT_64(uint64.unpack(fp.read(int64.size))[0])
elif t == BIN_INT64:
stack[-1][key] = INT_64(int64.unpack(fp.read(int64.size))[0])
elif t == BIN_FLOAT32:
stack[-1][key] = float32.unpack(fp.read(float32.size))[0]
else:
raise SyntaxError("Unknown data type at offset %d: %s" % (fp.tell() - 1, repr(t)))
if len(stack) != 1:
raise SyntaxError("Reached EOF, but Binary VDF is incomplete")
if raise_on_remaining and fp.read(1) != b'':
fp.seek(-1, 1)
raise SyntaxError("Binary VDF ended at offset %d, but there is more data remaining" % (fp.tell() - 1))
return stack.pop()
def binary_dumps(obj, alt_format=False):
"""
Serialize ``obj`` to a binary VDF formatted ``bytes``.
"""
buf = BytesIO()
binary_dump(obj, buf, alt_format)
return buf.getvalue()
def binary_dump(obj, fp, alt_format=False):
"""
Serialize ``obj`` to a binary VDF formatted ``bytes`` and write it to ``fp`` filelike object
"""
if not isinstance(obj, Mapping):
raise TypeError("Expected obj to be type of Mapping")
if not hasattr(fp, 'write'):
raise TypeError("Expected fp to have write() method")
for chunk in _binary_dump_gen(obj, alt_format=alt_format):
fp.write(chunk)
def _binary_dump_gen(obj, level=0, alt_format=False):
if level == 0 and len(obj) == 0:
return
int32 = struct.Struct('<i')
uint64 = struct.Struct('<Q')
int64 = struct.Struct('<q')
float32 = struct.Struct('<f')
for key, value in obj.items():
if isinstance(key, string_type):
key = key.encode('utf-8')
else:
raise TypeError("dict keys must be of type str, got %s" % type(key))
if isinstance(value, Mapping):
yield BIN_NONE + key + BIN_NONE
for chunk in _binary_dump_gen(value, level+1, alt_format=alt_format):
yield chunk
elif isinstance(value, UINT_64):
yield BIN_UINT64 + key + BIN_NONE + uint64.pack(value)
elif isinstance(value, INT_64):
yield BIN_INT64 + key + BIN_NONE + int64.pack(value)
elif isinstance(value, string_type):
try:
value = value.encode('utf-8') + BIN_NONE
yield BIN_STRING
except:
value = value.encode('utf-16') + BIN_NONE*2
yield BIN_WIDESTRING
yield key + BIN_NONE + value
elif isinstance(value, float):
yield BIN_FLOAT32 + key + BIN_NONE + float32.pack(value)
elif isinstance(value, (COLOR, POINTER, int, int_type)):
if isinstance(value, COLOR):
yield BIN_COLOR
elif isinstance(value, POINTER):
yield BIN_POINTER
else:
yield BIN_INT32
yield key + BIN_NONE
yield int32.pack(value)
else:
raise TypeError("Unsupported type: %s" % type(value))
yield BIN_END if not alt_format else BIN_END_ALT
def vbkv_loads(s, mapper=dict, merge_duplicate_keys=True):
"""
Deserialize ``s`` (``bytes`` containing a VBKV to a Python object.
``mapper`` specifies the Python object used after deserializetion. ``dict` is
used by default. Alternatively, ``collections.OrderedDict`` can be used if you
wish to preserve key order. Or any object that acts like a ``dict``.
``merge_duplicate_keys`` when ``True`` will merge multiple KeyValue lists with the
same key into one instead of overwriting. You can se this to ``False`` if you are
using ``VDFDict`` and need to preserve the duplicates.
"""
if s[:4] != b'VBKV':
raise ValueError("Invalid header")
checksum, = struct.unpack('<i', s[4:8])
if checksum != crc32(s[8:]):
raise ValueError("Invalid checksum")
return binary_loads(s[8:], mapper, merge_duplicate_keys, alt_format=True)
def vbkv_dumps(obj):
"""
Serialize ``obj`` to a VBKV formatted ``bytes``.
"""
data = b''.join(_binary_dump_gen(obj, alt_format=True))
checksum = crc32(data)
return b'VBKV' + struct.pack('<i', checksum) + data

View File

@ -0,0 +1,221 @@
import sys
from collections import Counter
if sys.version_info[0] >= 3:
_iter_values = 'values'
_range = range
_string_type = str
import collections.abc as _c
class _kView(_c.KeysView):
def __iter__(self):
return self._mapping.iterkeys()
class _vView(_c.ValuesView):
def __iter__(self):
return self._mapping.itervalues()
class _iView(_c.ItemsView):
def __iter__(self):
return self._mapping.iteritems()
else:
_iter_values = 'itervalues'
_range = xrange
_string_type = basestring
_kView = lambda x: list(x.iterkeys())
_vView = lambda x: list(x.itervalues())
_iView = lambda x: list(x.iteritems())
class VDFDict(dict):
def __init__(self, data=None):
"""
This is a dictionary that supports duplicate keys and preserves insert order
``data`` can be a ``dict``, or a sequence of key-value tuples. (e.g. ``[('key', 'value'),..]``)
The only supported type for key is str.
Get/set duplicates is done by tuples ``(index, key)``, where index is the duplicate index
for the specified key. (e.g. ``(0, 'key')``, ``(1, 'key')``...)
When the ``key`` is ``str``, instead of tuple, set will create a duplicate and get will look up ``(0, key)``
"""
self.__omap = []
self.__kcount = Counter()
if data is not None:
if not isinstance(data, (list, dict)):
raise ValueError("Expected data to be list of pairs or dict, got %s" % type(data))
self.update(data)
def __repr__(self):
out = "%s(" % self.__class__.__name__
out += "%s)" % repr(list(self.iteritems()))
return out
def __len__(self):
return len(self.__omap)
def _verify_key_tuple(self, key):
if len(key) != 2:
raise ValueError("Expected key tuple length to be 2, got %d" % len(key))
if not isinstance(key[0], int):
raise TypeError("Key index should be an int")
if not isinstance(key[1], _string_type):
raise TypeError("Key value should be a str")
def _normalize_key(self, key):
if isinstance(key, _string_type):
key = (0, key)
elif isinstance(key, tuple):
self._verify_key_tuple(key)
else:
raise TypeError("Expected key to be a str or tuple, got %s" % type(key))
return key
def __setitem__(self, key, value):
if isinstance(key, _string_type):
key = (self.__kcount[key], key)
self.__omap.append(key)
elif isinstance(key, tuple):
self._verify_key_tuple(key)
if key not in self:
raise KeyError("%s doesn't exist" % repr(key))
else:
raise TypeError("Expected either a str or tuple for key")
super(VDFDict, self).__setitem__(key, value)
self.__kcount[key[1]] += 1
def __getitem__(self, key):
return super(VDFDict, self).__getitem__(self._normalize_key(key))
def __delitem__(self, key):
key = self._normalize_key(key)
result = super(VDFDict, self).__delitem__(key)
start_idx = self.__omap.index(key)
del self.__omap[start_idx]
dup_idx, skey = key
self.__kcount[skey] -= 1
tail_count = self.__kcount[skey] - dup_idx
if tail_count > 0:
for idx in _range(start_idx, len(self.__omap)):
if self.__omap[idx][1] == skey:
oldkey = self.__omap[idx]
newkey = (dup_idx, skey)
super(VDFDict, self).__setitem__(newkey, self[oldkey])
super(VDFDict, self).__delitem__(oldkey)
self.__omap[idx] = newkey
dup_idx += 1
tail_count -= 1
if tail_count == 0:
break
if self.__kcount[skey] == 0:
del self.__kcount[skey]
return result
def __iter__(self):
return iter(self.iterkeys())
def __contains__(self, key):
return super(VDFDict, self).__contains__(self._normalize_key(key))
def __eq__(self, other):
if isinstance(other, VDFDict):
return list(self.items()) == list(other.items())
else:
return False
def __ne__(self, other):
return not self.__eq__(other)
def clear(self):
super(VDFDict, self).clear()
self.__kcount.clear()
self.__omap = list()
def get(self, key, *args):
return super(VDFDict, self).get(self._normalize_key(key), *args)
def setdefault(self, key, default=None):
if key not in self:
self.__setitem__(key, default)
return self.__getitem__(key)
def pop(self, key):
key = self._normalize_key(key)
value = self.__getitem__(key)
self.__delitem__(key)
return value
def popitem(self):
if not self.__omap:
raise KeyError("VDFDict is empty")
key = self.__omap[-1]
return key[1], self.pop(key)
def update(self, data=None, **kwargs):
if isinstance(data, dict):
data = data.items()
elif not isinstance(data, list):
raise TypeError("Expected data to be a list or dict, got %s" % type(data))
for key, value in data:
self.__setitem__(key, value)
def iterkeys(self):
return (key[1] for key in self.__omap)
def keys(self):
return _kView(self)
def itervalues(self):
return (self[key] for key in self.__omap)
def values(self):
return _vView(self)
def iteritems(self):
return ((key[1], self[key]) for key in self.__omap)
def items(self):
return _iView(self)
def get_all_for(self, key):
""" Returns all values of the given key """
if not isinstance(key, _string_type):
raise TypeError("Key needs to be a string.")
return [self[(idx, key)] for idx in _range(self.__kcount[key])]
def remove_all_for(self, key):
""" Removes all items with the given key """
if not isinstance(key, _string_type):
raise TypeError("Key need to be a string.")
for idx in _range(self.__kcount[key]):
super(VDFDict, self).__delitem__((idx, key))
self.__omap = list(filter(lambda x: x[1] != key, self.__omap))
del self.__kcount[key]
def has_duplicates(self):
"""
Returns ``True`` if the dict contains keys with duplicates.
Recurses through any all keys with value that is ``VDFDict``.
"""
for n in getattr(self.__kcount, _iter_values)():
if n != 1:
return True
def dict_recurse(obj):
for v in getattr(obj, _iter_values)():
if isinstance(v, VDFDict) and v.has_duplicates():
return True
elif isinstance(v, dict):
return dict_recurse(v)
return False
return dict_recurse(self)